Source code for data_engine.services.runtime_binding

"""Workspace runtime binding services for operator surfaces."""

from __future__ import annotations

from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from os import getpid
from typing import TYPE_CHECKING

from data_engine.domain import FlowLogEntry, FlowRunState, StepOutputIndex
from data_engine.domain.catalog import FlowCatalogLike
from data_engine.domain.time import parse_utc_text
from data_engine.hosts.daemon.manager import WorkspaceDaemonManager
from data_engine.platform.workspace_models import WorkspacePaths
from data_engine.runtime.runtime_db import RuntimeCacheLedger
from data_engine.services.daemon_state import DaemonStateService
from data_engine.services.ledger import RuntimeControlLedgerService
from data_engine.services.logs import LogService
from data_engine.services.runtime_history import RuntimeHistoryService
from data_engine.services.runtime_ports import RuntimeCacheStore, RuntimeControlStore
from data_engine.views.logs import FlowLogStore

if TYPE_CHECKING:
    from data_engine.application.runtime import RuntimeApplication


class _NullRuntimeCacheLedger:
    """In-memory no-op runtime cache ledger for an unconfigured workspace selection."""

    def __init__(self) -> None:
        self.runs = _NullRuntimeRunRepository()
        self.step_outputs = _NullRuntimeStepOutputRepository()
        self.logs = _NullRuntimeLogRepository()
        self.source_signatures = _NullRuntimeSourceSignatureRepository()
        self.execution_state = _NullRuntimeExecutionStateRepository()

    def close(self) -> None:
        return


class _NullRuntimeRunRepository:
    """No-op runtime run repository for an unconfigured workspace selection."""

    def list(self, *, flow_name: str | None = None) -> tuple[object, ...]:
        """Return no runs for an unconfigured workspace selection."""
        del flow_name
        return ()


class _NullRuntimeLogRepository:
    """No-op runtime log repository for an unconfigured workspace selection."""

    def list(
        self,
        *,
        flow_name: str | None = None,
        run_id: str | None = None,
        after_id: int | None = None,
    ) -> tuple[object, ...]:
        """Return no logs for an unconfigured workspace selection."""
        del flow_name, run_id, after_id
        return ()


class _NullRuntimeStepOutputRepository:
    """No-op step-output repository for an unconfigured workspace selection."""

    def list_for_run(self, run_id: str) -> tuple[object, ...]:
        """Return no step outputs for an unconfigured workspace selection."""
        del run_id
        return ()

    def list(self, *, flow_name: str | None = None, after_id: int | None = None) -> tuple[object, ...]:
        """Return no step outputs for an unconfigured workspace selection."""
        del flow_name, after_id
        return ()


class _NullRuntimeSourceSignatureRepository:
    """No-op source-signature repository for an unconfigured workspace selection."""

    def list_file_states(self, *, flow_name: str | None = None) -> tuple[object, ...]:
        """Return no file states for an unconfigured workspace selection."""
        del flow_name
        return ()


class _NullRuntimeExecutionStateRepository:
    """No-op execution-state writer for an unconfigured workspace selection."""

    def record_run_started(self, **kwargs: object) -> None:
        """Ignore execution-state writes for an unconfigured workspace selection."""
        del kwargs


class _NullClientSessionRepository:
    """No-op client-session repository for an unconfigured workspace selection."""

    def upsert(self, **kwargs: object) -> None:
        """Ignore client-session registration for an unconfigured workspace selection."""
        del kwargs

    def remove(self, client_id: str) -> None:
        """Ignore client-session removal for an unconfigured workspace selection."""
        del client_id

    def remove_for_process(self, *, workspace_id: str, client_kind: str, pid: int) -> None:
        """Ignore process-session removal for an unconfigured workspace selection."""
        del workspace_id, client_kind, pid

    def count_live(self, workspace_id: str, *, exclude_client_id: str | None = None) -> int:
        """Return zero live sessions for an unconfigured workspace selection."""
        del workspace_id, exclude_client_id
        return 0


class _NullRuntimeControlLedger:
    """No-op runtime control ledger for an unconfigured workspace selection."""

    def __init__(self) -> None:
        self.client_sessions = _NullClientSessionRepository()

    def close(self) -> None:
        return


[docs] @dataclass(frozen=True) class WorkspaceRuntimeBinding: """Concrete runtime resources bound to one selected workspace.""" workspace_paths: WorkspacePaths runtime_cache_ledger: RuntimeCacheStore runtime_control_ledger: RuntimeControlStore log_store: FlowLogStore daemon_manager: WorkspaceDaemonManager
[docs] class WorkspaceRuntimeBindingService: """Own concrete runtime binding lifecycle for GUI/TUI surfaces.""" def __init__( self, *, ledger_service: RuntimeControlLedgerService, log_service: LogService, daemon_state_service: DaemonStateService, runtime_history_service: RuntimeHistoryService, ) -> None: self.ledger_service = ledger_service self.log_service = log_service self.daemon_state_service = daemon_state_service self.runtime_history_service = runtime_history_service self._step_output_cache: dict[int, tuple[tuple[object, ...], int | None, StepOutputIndex]] = {}
[docs] def open_binding(self, workspace_paths: WorkspacePaths) -> WorkspaceRuntimeBinding: """Open one concrete runtime binding for a workspace selection.""" if workspace_paths.workspace_configured: runtime_cache_ledger = RuntimeCacheLedger(workspace_paths.runtime_cache_db_path) runtime_control_ledger = self.ledger_service.open_for_workspace(workspace_paths.workspace_root) else: runtime_cache_ledger = _NullRuntimeCacheLedger() runtime_control_ledger = _NullRuntimeControlLedger() return WorkspaceRuntimeBinding( workspace_paths=workspace_paths, runtime_cache_ledger=runtime_cache_ledger, runtime_control_ledger=runtime_control_ledger, log_store=self.log_service.create_store(runtime_cache_ledger), daemon_manager=self.daemon_state_service.create_manager(workspace_paths), )
[docs] def close_binding(self, binding: WorkspaceRuntimeBinding) -> None: """Close one concrete runtime binding.""" self._step_output_cache.pop(id(binding), None) binding.runtime_cache_ledger.close() self.ledger_service.close(binding.runtime_control_ledger)
[docs] def register_client_session( self, binding: WorkspaceRuntimeBinding, *, client_id: str, client_kind: str, pid: int | None = None, ) -> None: """Register or refresh one local client session for the binding workspace.""" self.ledger_service.register_client_session( binding.runtime_control_ledger, client_id=client_id, workspace_id=binding.workspace_paths.workspace_id, client_kind=client_kind, pid=getpid() if pid is None else pid, )
[docs] def remove_client_session(self, binding: WorkspaceRuntimeBinding, client_id: str) -> None: """Remove one active local client session row.""" self.ledger_service.remove_client_session(binding.runtime_control_ledger, client_id)
[docs] def purge_process_client_sessions( self, binding: WorkspaceRuntimeBinding, *, client_kind: str, pid: int | None = None, ) -> None: """Remove all client sessions for this workspace/client-kind/process tuple.""" self.ledger_service.purge_process_client_sessions( binding.runtime_control_ledger, workspace_id=binding.workspace_paths.workspace_id, client_kind=client_kind, pid=getpid() if pid is None else pid, )
[docs] def count_live_client_sessions( self, binding: WorkspaceRuntimeBinding, *, exclude_client_id: str | None = None, ) -> int: """Return the number of live local client sessions for the binding workspace.""" return self.ledger_service.count_live_client_sessions( binding.runtime_control_ledger, binding.workspace_paths.workspace_id, exclude_client_id=exclude_client_id, )
[docs] def sync_runtime_state( self, binding: WorkspaceRuntimeBinding, *, runtime_application: "RuntimeApplication", flow_cards, daemon_startup_in_progress: bool = False, ) -> object: """Return daemon/runtime sync state for one bound workspace.""" return runtime_application.sync_state( paths=binding.workspace_paths, daemon_manager=binding.daemon_manager, flow_cards=flow_cards, runtime_ledger=binding.runtime_cache_ledger, daemon_startup_in_progress=daemon_startup_in_progress, )
[docs] def reload_logs(self, binding: WorkspaceRuntimeBinding) -> None: """Reload the binding log store from its runtime cache store.""" self.log_service.reload(binding.log_store, binding.runtime_cache_ledger)
[docs] def invalidate_flow_history(self, binding: WorkspaceRuntimeBinding, *, flow_name: str) -> None: """Drop one flow's cached logs and derived step-output state after destructive resets.""" self.log_service.clear_flow(binding.log_store, flow_name) self._step_output_cache.pop(id(binding), None)
[docs] def rebuild_step_outputs( self, binding: WorkspaceRuntimeBinding, flow_cards: dict[str, FlowCatalogLike], ) -> StepOutputIndex: """Rebuild latest successful per-step output paths for visible flows.""" cache_key = id(binding) flow_signature = tuple( sorted((flow_name, tuple(card.operation_items)) for flow_name, card in flow_cards.items()) ) cached = self._step_output_cache.get(cache_key) if cached is None or cached[0] != flow_signature: refreshed = self.runtime_history_service.rebuild_step_outputs( binding.runtime_cache_ledger, flow_cards, ) last_step_run_id = refreshed.last_step_run_id index = refreshed.index else: last_seen_id = cached[1] current_index = cached[2] refreshed = self.runtime_history_service.refresh_step_outputs( binding.runtime_cache_ledger, flow_cards, current_index=current_index, last_seen_step_run_id=last_seen_id, ) last_step_run_id = refreshed.last_step_run_id index = refreshed.index self._step_output_cache[cache_key] = (flow_signature, last_step_run_id, index) return index
[docs] def error_text_for_entry( self, binding: WorkspaceRuntimeBinding, run_group: FlowRunState, entry: FlowLogEntry, ) -> tuple[str, str | None]: """Return one user-facing error title and persisted error text.""" return self.runtime_history_service.error_text_for_entry(binding.runtime_cache_ledger, run_group, entry)
[docs] def recent_run_count(self, binding: WorkspaceRuntimeBinding, *, days: int) -> int: """Return the number of persisted runs started in the recent window.""" cutoff = datetime.now(UTC) - timedelta(days=days) count = 0 try: runs = binding.runtime_cache_ledger.runs.list() except Exception: return 0 for run in runs: started_at = parse_utc_text(run.started_at_utc) if started_at is not None and started_at >= cutoff: count += 1 return count
__all__ = ["WorkspaceRuntimeBinding", "WorkspaceRuntimeBindingService"]