import asyncio import logging import platform from typing import Optional import api import grpc import systemd import storage from google.protobuf.empty_pb2 import Empty class AgentApi(api.AgentServicer): def __init__(self) -> None: super().__init__() self._systemd_monitor: Optional[systemd.Monitor] = None self._storage_monitor: Optional[storage.Monitor] = None async def Configure(self, request: api.AgentConfiguration, context: grpc.aio.ServicerContext) -> api.AgentConfigurationResponse: if self._systemd_monitor: del self._systemd_monitor self._systemd_monitor = None if self._storage_monitor: del self._storage_monitor self._storage_monitor = None try: if len(request.services_to_monitor) > 0: self._systemd_monitor = systemd.Monitor(request.services_to_monitor) await self._systemd_monitor.initialize() if len(request.storage_to_monitor) > 0: self._storage_monitor = storage.Monitor(request.storage_to_monitor) await self._storage_monitor.initialize() except Exception as e: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{e}") raise RuntimeWarning("Error encountered while servicing API.") return api.AgentConfigurationResponse(hostname=platform.node()) async def Poll(self, request: Empty, context: grpc.aio.ServicerContext) -> api.MonitoringStats: try: if self._systemd_monitor: services = await self._systemd_monitor.poll() else: services = None if self._storage_monitor: storage = await self._storage_monitor.poll() else: storage = None return api.MonitoringStats(services=services, storage=storage) except Exception as e: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{e}") raise RuntimeWarning("Error encountered while servicing API.")