2024-05-01 13:25:07 +03:00

67 lines
2.2 KiB
Python

import asyncio
import logging
import platform
from typing import Optional
import api
import grpc
import systemd
import storage
from google.protobuf.empty_pb2 import Empty
class AgentApi(api.AgentServicer):
def __init__(self) -> None:
super().__init__()
self._systemd_monitor: Optional[systemd.Monitor] = None
self._storage_monitor: Optional[storage.Monitor] = None
async def Configure(self,
request: api.AgentConfiguration,
context: grpc.aio.ServicerContext) -> api.AgentConfigurationResponse:
if self._systemd_monitor:
del self._systemd_monitor
self._systemd_monitor = None
if self._storage_monitor:
del self._storage_monitor
self._storage_monitor = None
try:
if len(request.services_to_monitor) > 0:
self._systemd_monitor = systemd.Monitor(request.services_to_monitor)
await self._systemd_monitor.initialize()
if len(request.storage_to_monitor) > 0:
self._storage_monitor = storage.Monitor(request.storage_to_monitor)
await self._storage_monitor.initialize()
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"{e}")
raise RuntimeWarning("Error encountered while servicing API.")
return api.AgentConfigurationResponse(hostname=platform.node())
async def Poll(self,
request: Empty,
context: grpc.aio.ServicerContext) -> api.MonitoringStats:
try:
if self._systemd_monitor:
services = await self._systemd_monitor.poll()
else:
services = None
if self._storage_monitor:
storage = await self._storage_monitor.poll()
else:
storage = None
return api.MonitoringStats(services=services, storage=storage)
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"{e}")
raise RuntimeWarning("Error encountered while servicing API.")