from typing import Any, Optional, Iterable from google.protobuf.timestamp_pb2 import Timestamp from dbus_next.aio import MessageBus from dbus_next.constants import BusType import api class Monitor: def __init__(self, unit_names: Iterable[str]): self._monitored_units: dict[str, Optional[str]] = {} for name in unit_names: self._monitored_units[name] = None async def initialize(self) -> None: self._dbus = await self._try_construct_bus() self._manager = await self._try_construct_systemd_manager() await self._validate_existing_units() async def poll(self) -> list[api.MonitoredService]: response_list = [] for name, path in self._monitored_units.items(): unit = await self._get_unit(path) monitored_service = await self._unit_into_monitored_service(name, unit) response_list.append(monitored_service) return response_list async def _try_construct_bus(self) -> MessageBus: return await MessageBus(bus_type=BusType.SYSTEM).connect() async def _try_construct_systemd_manager(self) -> "ProxyInterface": introspect = await self._dbus.introspect("org.freedesktop.systemd1", "/org/freedesktop/systemd1") obj = self._dbus.get_proxy_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1", introspect) manager = obj.get_interface("org.freedesktop.systemd1.Manager") return manager async def _validate_existing_units(self) -> None: all_units = await self._manager.call_list_units() looking_for = [key for key, _ in self._monitored_units.items()] for unit in all_units: (name, _, _, _, _, _, path, _, _, _) = unit if name in looking_for: looking_for.remove(name) self._monitored_units[name] = path if len(looking_for) > 0: raise RuntimeError(f"{looking_for.count} units unaccounted for by systemd monitor.") async def _get_unit(self, path: str) -> "ProxyInterface": interface = await self._dbus.introspect("org.freedesktop.systemd1", path) return self._dbus.get_proxy_object("org.freedesktop.systemd1", path, interface) async def _unit_into_monitored_service(self, name: str, unit: "ProxyInterface") -> api.MonitoredService: unit_object = unit.get_interface("org.freedesktop.systemd1.Unit") service_object = unit.get_interface("org.freedesktop.systemd1.Service") state: str = await unit_object.get_active_state() result = await service_object.get_result() result = True if result == "success" else False started_usec: int = await service_object.get_exec_main_start_timestamp() exited_usec: int = await service_object.get_exec_main_exit_timestamp() if started_usec > 0: started = Timestamp() started.FromMicroseconds(started_usec) else: started = None if exited_usec > 0: exited = Timestamp() exited.FromMicroseconds(exited_usec) else: exited = None return api.MonitoredService(name=name, state=state.upper(), result_success=result, main_started=started, main_exited=exited)