Source code for data_engine.services.ledger

"""Runtime control-ledger services."""

from __future__ import annotations

from pathlib import Path
from typing import Callable

from data_engine.platform.paths import stable_absolute_path
from data_engine.platform.workspace_policy import RuntimeLayoutPolicy
from data_engine.runtime.runtime_db import RuntimeControlLedger
from data_engine.services.runtime_ports import RuntimeControlStore


[docs] class RuntimeControlLedgerService: """Own workspace-local runtime control-ledger access and client-session bookkeeping.""" def __init__( self, open_ledger_func: Callable[[Path], RuntimeControlStore] | None = None, *, runtime_layout_policy: RuntimeLayoutPolicy | None = None, ) -> None: self.runtime_layout_policy = runtime_layout_policy or RuntimeLayoutPolicy() self._open_ledger_func = open_ledger_func or self._open_default_ledger def _open_default_ledger(self, workspace_root: Path) -> RuntimeControlStore: paths = self.runtime_layout_policy.resolve_paths(workspace_root=workspace_root) return RuntimeControlLedger(paths.runtime_control_db_path)
[docs] def open_for_workspace(self, workspace_root: Path) -> RuntimeControlStore: """Open the configured runtime control ledger for one workspace root.""" return self._open_ledger_func(stable_absolute_path(workspace_root))
[docs] def close(self, ledger: RuntimeControlStore) -> None: """Close one runtime control-ledger connection.""" ledger.close()
[docs] def register_client_session( self, ledger: RuntimeControlStore, *, client_id: str, workspace_id: str, client_kind: str, pid: int, ) -> None: """Register or refresh one active local client session.""" ledger.client_sessions.upsert( client_id=client_id, workspace_id=workspace_id, client_kind=client_kind, pid=pid, )
[docs] def remove_client_session(self, ledger: RuntimeControlStore, client_id: str) -> None: """Remove one active local client session row.""" ledger.client_sessions.remove(client_id)
[docs] def purge_process_client_sessions( self, ledger: RuntimeControlStore, *, workspace_id: str, client_kind: str, pid: int, ) -> None: """Remove all client sessions for one workspace/client-kind/process tuple.""" ledger.client_sessions.remove_for_process( workspace_id=workspace_id, client_kind=client_kind, pid=pid, )
[docs] def count_live_client_sessions( self, ledger: RuntimeControlStore, workspace_id: str, *, exclude_client_id: str | None = None, ) -> int: """Return the number of currently live client sessions for one workspace.""" if exclude_client_id is None: return ledger.client_sessions.count_live(workspace_id) return ledger.client_sessions.count_live(workspace_id, exclude_client_id=exclude_client_id)
LedgerService = RuntimeControlLedgerService __all__ = ["LedgerService", "RuntimeControlLedgerService"]