Source code for data_engine.services.runtime_execution

"""Runtime execution services for flow runs and grouped engine runs."""

from __future__ import annotations

from threading import Event
from typing import TYPE_CHECKING, Callable

from data_engine.core.primitives import WatchSpec
from data_engine.hosts.scheduler import SchedulerHost
from data_engine.runtime.execution import FlowRuntime, GroupedFlowRuntime
from data_engine.runtime.engine import RuntimeEngine
from data_engine.runtime.stop import RuntimeStopController
from data_engine.services.runtime_ports import RuntimeCacheStore

if TYPE_CHECKING:
    from data_engine.core.flow import Flow as CoreFlow


[docs] class RuntimeExecutionService: """Own executable runtime construction for manual and grouped runs.""" def __init__( self, *, flow_runtime_type: type[FlowRuntime] = FlowRuntime, grouped_runtime_type: type[GroupedFlowRuntime] = GroupedFlowRuntime, runtime_engine_type: type[RuntimeEngine] = RuntimeEngine, scheduler_host_factory: Callable[..., SchedulerHost] = SchedulerHost, run_stop_controller: RuntimeStopController | None = None, ) -> None: self._flow_runtime_type = flow_runtime_type self._grouped_runtime_type = grouped_runtime_type self._runtime_engine_type = runtime_engine_type self._scheduler_host_factory = scheduler_host_factory self._run_stop_controller = run_stop_controller or RuntimeStopController() def _engine( self, *, runtime_ledger: RuntimeCacheStore | None = None, runtime_stop_event: Event | None = None, flow_stop_event: Event | None = None, ) -> RuntimeEngine: return self._runtime_engine_type( runtime_ledger=runtime_ledger, runtime_stop_event=runtime_stop_event, flow_stop_event=flow_stop_event, flow_runtime_type=self._flow_runtime_type, grouped_runtime_type=self._grouped_runtime_type, run_stop_controller=self._run_stop_controller, )
[docs] def run_once( self, flow: "CoreFlow", *, runtime_ledger: RuntimeCacheStore | None = None, runtime_stop_event: Event | None = None, flow_stop_event: Event | None = None, ) -> object: """Run one flow as a one-shot execution.""" return self._engine( runtime_stop_event=runtime_stop_event, flow_stop_event=flow_stop_event, runtime_ledger=runtime_ledger, ).run_once(flow)
[docs] def run_source( self, flow: "CoreFlow", source_path: str, *, runtime_ledger: RuntimeCacheStore | None = None, runtime_stop_event: Event | None = None, flow_stop_event: Event | None = None, ) -> object: """Run one flow for a specific source path.""" return self._engine( runtime_stop_event=runtime_stop_event, flow_stop_event=flow_stop_event, runtime_ledger=runtime_ledger, ).run_source(flow, source_path)
[docs] def run_batch( self, flow: "CoreFlow", *, runtime_ledger: RuntimeCacheStore | None = None, runtime_stop_event: Event | None = None, flow_stop_event: Event | None = None, ) -> object: """Run one flow once in batch mode.""" return self._engine( runtime_stop_event=runtime_stop_event, flow_stop_event=flow_stop_event, runtime_ledger=runtime_ledger, ).run_batch(flow)
[docs] def preview( self, flow: "CoreFlow", *, use: str | None = None, runtime_ledger: RuntimeCacheStore | None = None, ) -> object: """Preview one flow through the one-shot runtime path.""" return self._engine( runtime_ledger=runtime_ledger, ).preview(flow, use=use)
[docs] def run_manual( self, flow: "CoreFlow", *, runtime_ledger: RuntimeCacheStore, runtime_stop_event: Event, flow_stop_event: Event | None = None, ) -> object: """Run one flow as a manual one-shot execution.""" return self.run_once( flow, runtime_ledger=runtime_ledger, runtime_stop_event=runtime_stop_event, flow_stop_event=flow_stop_event, )
[docs] def run_continuous( self, flow: "CoreFlow", *, runtime_ledger: RuntimeCacheStore | None = None, flow_stop_event: Event | None = None, ) -> object: """Run one flow continuously.""" return self._engine( flow_stop_event=flow_stop_event, runtime_ledger=runtime_ledger, ).run_continuous(flow)
[docs] def run_grouped( self, flows: tuple["CoreFlow", ...], *, runtime_ledger: RuntimeCacheStore, runtime_stop_event: Event, flow_stop_event: Event, ) -> object: """Run grouped automated flows continuously.""" return self._engine( runtime_stop_event=runtime_stop_event, flow_stop_event=flow_stop_event, runtime_ledger=runtime_ledger, ).run_grouped(flows, continuous=True)
[docs] def run_automated( self, flows: tuple["CoreFlow", ...], *, runtime_ledger: RuntimeCacheStore | None = None, runtime_stop_event: Event, flow_stop_event: Event, ) -> object: """Run automated poll and schedule flows through separate host timing surfaces.""" polling_flows, scheduled_flows = self._split_automated_flows(flows) scheduler_host = None scheduler_jobs = () scheduler_started = False if scheduled_flows: scheduler_engine = self._engine( flow_stop_event=flow_stop_event, runtime_ledger=runtime_ledger, ) scheduler_host = self._scheduler_host_factory(runtime_engine=scheduler_engine) scheduler_jobs = scheduler_host.rebuild_jobs(scheduled_flows) try: if scheduler_jobs and scheduler_host is not None: scheduler_host.start() scheduler_started = True if polling_flows: return self._engine( runtime_stop_event=runtime_stop_event, flow_stop_event=flow_stop_event, runtime_ledger=runtime_ledger, ).run_grouped(polling_flows, continuous=True) runtime_stop_event.wait() return [] finally: if scheduler_started and scheduler_host is not None: scheduler_host.shutdown()
[docs] def run_grouped_continuous( self, flows: tuple["CoreFlow", ...], *, runtime_ledger: RuntimeCacheStore | None = None, runtime_stop_event: Event | None = None, flow_stop_event: Event | None = None, ) -> object: """Run grouped automated flows continuously with optional runtime controls.""" return self.run_automated( flows, runtime_stop_event=runtime_stop_event or Event(), flow_stop_event=flow_stop_event or Event(), runtime_ledger=runtime_ledger, )
[docs] def stop(self, run_id: str, *, flow_stop_event: Event | None = None) -> None: """Request that an active runtime stop a run by id.""" self._engine(flow_stop_event=flow_stop_event).stop(run_id)
def _split_automated_flows(self, flows: tuple["CoreFlow", ...]) -> tuple[tuple["CoreFlow", ...], tuple["CoreFlow", ...]]: polling_flows: list["CoreFlow"] = [] scheduled_flows: list["CoreFlow"] = [] for flow in flows: trigger = flow.trigger if isinstance(trigger, WatchSpec) and trigger.mode == "schedule": scheduled_flows.append(flow) else: polling_flows.append(flow) return tuple(polling_flows), tuple(scheduled_flows)
__all__ = ["RuntimeExecutionService"]