2024-05-01 13:25:07 +03:00

87 lines
3.3 KiB
Python

from typing import Any, Optional, Iterable
from google.protobuf.timestamp_pb2 import Timestamp
from dbus_next.aio import MessageBus
from dbus_next.constants import BusType
import api
class Monitor:
def __init__(self, unit_names: Iterable[str]):
self._monitored_units: dict[str, Optional[str]] = {}
for name in unit_names:
self._monitored_units[name] = None
async def initialize(self) -> None:
self._dbus = await self._try_construct_bus()
self._manager = await self._try_construct_systemd_manager()
await self._validate_existing_units()
async def poll(self) -> list[api.MonitoredService]:
response_list = []
for name, path in self._monitored_units.items():
unit = await self._get_unit(path)
monitored_service = await self._unit_into_monitored_service(name, unit)
response_list.append(monitored_service)
return response_list
async def _try_construct_bus(self) -> MessageBus:
return await MessageBus(bus_type=BusType.SYSTEM).connect()
async def _try_construct_systemd_manager(self) -> "ProxyInterface":
introspect = await self._dbus.introspect("org.freedesktop.systemd1", "/org/freedesktop/systemd1")
obj = self._dbus.get_proxy_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1", introspect)
manager = obj.get_interface("org.freedesktop.systemd1.Manager")
return manager
async def _validate_existing_units(self) -> None:
all_units = await self._manager.call_list_units()
looking_for = [key for key, _ in self._monitored_units.items()]
for unit in all_units:
(name, _, _, _, _, _, path, _, _, _) = unit
if name in looking_for:
looking_for.remove(name)
self._monitored_units[name] = path
if len(looking_for) > 0:
raise RuntimeError(f"{looking_for.count} units unaccounted for by systemd monitor.")
async def _get_unit(self, path: str) -> "ProxyInterface":
interface = await self._dbus.introspect("org.freedesktop.systemd1", path)
return self._dbus.get_proxy_object("org.freedesktop.systemd1", path, interface)
async def _unit_into_monitored_service(self, name: str, unit: "ProxyInterface") -> api.MonitoredService:
unit_object = unit.get_interface("org.freedesktop.systemd1.Unit")
service_object = unit.get_interface("org.freedesktop.systemd1.Service")
state: str = await unit_object.get_active_state()
result = await service_object.get_result()
result = True if result == "success" else False
started_usec: int = await service_object.get_exec_main_start_timestamp()
exited_usec: int = await service_object.get_exec_main_exit_timestamp()
if started_usec > 0:
started = Timestamp()
started.FromMicroseconds(started_usec)
else:
started = None
if exited_usec > 0:
exited = Timestamp()
exited.FromMicroseconds(exited_usec)
else:
exited = None
return api.MonitoredService(name=name,
state=state.upper(),
result_success=result,
main_started=started,
main_exited=exited)