Source code for data_engine.runtime.stop

"""Run-id-aware runtime stop control."""

from __future__ import annotations

from threading import Lock

from data_engine.core.model import FlowStoppedError


[docs] class RuntimeStopController: """Track stop requests for specific active runtime run ids.""" def __init__(self) -> None: self._lock = Lock() self._requested_run_ids: set[str] = set() self._active_run_ids: set[str] = set()
[docs] def request_stop(self, run_id: str) -> None: """Request that one active or future run id stop.""" normalized = str(run_id).strip() if not normalized: raise ValueError("run_id must be non-empty.") with self._lock: self._requested_run_ids.add(normalized)
[docs] def register_run(self, run_id: str) -> None: """Mark one run id as active.""" with self._lock: self._active_run_ids.add(run_id)
[docs] def unregister_run(self, run_id: str) -> None: """Clear active and requested state for one completed run id.""" with self._lock: self._active_run_ids.discard(run_id) self._requested_run_ids.discard(run_id)
[docs] def check_run(self, run_id: str | None) -> None: """Raise when stop has been requested for ``run_id``.""" if run_id is None: return with self._lock: stop_requested = run_id in self._requested_run_ids if stop_requested: raise FlowStoppedError(f"Run stop requested by operator: {run_id}.")
[docs] def active_run_ids(self) -> tuple[str, ...]: """Return active run ids in stable order.""" with self._lock: return tuple(sorted(self._active_run_ids))
__all__ = ["RuntimeStopController"]