Source code for data_engine.runtime.execution.single

"""Single-runtime orchestration for authored flows."""

from __future__ import annotations

from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, as_completed, wait
from pathlib import Path
import threading
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable

from data_engine.core.model import FlowStoppedError, FlowValidationError
from data_engine.core.primitives import FlowContext, WatchSpec
from data_engine.runtime.result_cleanup import release_context_values
from data_engine.runtime.execution.continuous import ContinuousRuntimeLoop
from data_engine.runtime.execution.context import QueuedRunJob, RuntimeContextBuilder
from data_engine.runtime.execution.logging import RuntimeLogEmitter, acquire_queued_runtime_log_sink
from data_engine.runtime.execution.polling import RuntimePollingSupport
from data_engine.runtime.execution.runner import FlowRunExecutionPorts, FlowRunExecutor
from data_engine.runtime.file_watch import PollingWatcher
from data_engine.runtime.runtime_db import RuntimeCacheLedger
from data_engine.runtime.stop import RuntimeStopController
from data_engine.services.runtime_ports import RuntimeCacheStore
from data_engine.platform.instrumentation import append_timing_line

if TYPE_CHECKING:
    from data_engine.core.flow import Flow as CoreFlow


def _open_default_runtime_cache_ledger() -> RuntimeCacheStore:
    """Open the default runtime ledger for authored flow execution.

    Execution writes should use the direct runtime cache ledger rather than the
    cached Runtime IO wrapper. The IO layer serializes all writes through one
    process-wide queue, which is useful for operator/UI reads but can throttle
    unrelated flows when automated runs execute concurrently.
    """
    return RuntimeCacheLedger.open_default()


@dataclass(frozen=True)
class RuntimeCacheLedgerService:
    """Own how authored flow execution opens its runtime ledger."""

    open_runtime_cache_ledger_func: Callable[[], RuntimeCacheStore]

    def open_runtime_cache_ledger(self) -> RuntimeCacheStore:
        """Open one runtime ledger for authored flow execution."""
        return self.open_runtime_cache_ledger_func()


def default_runtime_cache_ledger_service() -> RuntimeCacheLedgerService:
    """Build the default runtime-ledger service for authored flows."""
    return RuntimeCacheLedgerService(open_runtime_cache_ledger_func=_open_default_runtime_cache_ledger)


[docs] class FlowRuntime: """Sequential runtime that executes one or more configured flows.""" def __init__( self, flows: tuple["CoreFlow", ...], *, continuous: bool, runtime_stop_event: threading.Event | None = None, flow_stop_event: threading.Event | None = None, status_callback: Callable[[str], None] | None = None, runtime_ledger: RuntimeCacheStore | None = None, runtime_ledger_service: RuntimeCacheLedgerService | None = None, runtime_ledger_factory: Callable[[], RuntimeCacheStore] | None = None, run_stop_controller: RuntimeStopController | None = None, workspace_id: str | None = None, ) -> None: self.flows = tuple(flows) self.continuous = continuous self.runtime_stop_event = runtime_stop_event self.flow_stop_event = flow_stop_event self.run_stop_controller = run_stop_controller or RuntimeStopController() self.status_callback = status_callback runtime_ledger_service = runtime_ledger_service or default_runtime_cache_ledger_service() self._runtime_ledger_factory = runtime_ledger_factory or runtime_ledger_service.open_runtime_cache_ledger self._owns_runtime_ledger = runtime_ledger is None self.runtime_ledger = runtime_ledger or self._runtime_ledger_factory() runtime_db_path = getattr(self.runtime_ledger, "db_path", None) debug_root = Path(runtime_db_path).expanduser().resolve().parent / "debug_artifacts" if runtime_db_path is not None else None self._timing_log_path = Path(runtime_db_path).expanduser().resolve().parent / "daemon_timing.log" if runtime_db_path is not None else None self.context_builder = RuntimeContextBuilder(debug_root=debug_root, workspace_id=workspace_id) self._queued_log_sink = acquire_queued_runtime_log_sink(self.runtime_ledger.logs) self.log_emitter = RuntimeLogEmitter(self._queued_log_sink, workspace_id=workspace_id) self.polling = RuntimePollingSupport(self.runtime_ledger.source_signatures) self.run_executor = FlowRunExecutor( FlowRunExecutionPorts( context_builder=self.context_builder, polling=self.polling, state_writer=self.runtime_ledger.execution_state, log_emitter=self.log_emitter, stop_controller=self, timing_log_path=self._timing_log_path, execution_mode="continuous" if continuous else "oneshot", ) ) self.continuous_loop = ContinuousRuntimeLoop(self)
[docs] def run(self) -> list[FlowContext]: try: self._validate() if not self.continuous or all(flow.mode == "manual" for flow in self.flows): return self._run_once_all() return self.continuous_loop.run() finally: self._close_runtime_resources()
[docs] def run_and_discard(self) -> None: """Run one-shot flows while releasing completed contexts immediately.""" try: self._validate() if self.continuous and not all(flow.mode == "manual" for flow in self.flows): raise FlowValidationError("run_and_discard() is only available for one-shot executions.") self._run_once_all(collect_results=False) finally: self._close_runtime_resources()
[docs] def preview(self, *, use: str | None = None): """Run exactly one flow for notebook-style inspection and return one object.""" try: self._validate() if len(self.flows) != 1: raise FlowValidationError("preview() requires exactly one flow.") flow = self.flows[0] startup_sources = self.polling.startup_sources(flow) if not startup_sources: raise FlowValidationError("preview() could not determine a startup source.") context = self.run_executor.preview_one(flow, startup_sources[0], use=use) if use is None or use == "current": return context.current if use not in context.objects: raise FlowValidationError(f"preview() could not find saved object {use!r}.") return context.objects[use] finally: self._close_runtime_resources()
[docs] def run_source(self, flow: "CoreFlow", source_path: str | Path) -> FlowContext: """Run one flow for a specific source path.""" try: self._validate() return self.run_executor.run_one(flow, Path(source_path)) finally: self._close_runtime_resources()
[docs] def run_batch(self, flow: "CoreFlow") -> FlowContext: """Run one flow once in batch mode using the configured source root.""" try: self._validate() return self.run_executor.run_one( flow, None, batch_signatures=self.polling.stale_batch_poll_signatures(flow), ) finally: self._close_runtime_resources()
def _close_runtime_resources(self) -> None: """Drain queued log writes and close the runtime ledger when owned by this runtime.""" self._queued_log_sink.close() close_current_thread_connection = getattr(self.runtime_ledger, "close_current_thread_connection", None) if callable(close_current_thread_connection): close_current_thread_connection() if not self._owns_runtime_ledger: return self.runtime_ledger.close() def _validate(self) -> None: names = [flow.name for flow in self.flows] if any(name is None or not str(name).strip() for name in names): raise FlowValidationError("Flow names must be set before execution.") if len(set(names)) != len(names): raise FlowValidationError("Flow names must be unique within one runtime.") for flow in self.flows: if not flow.steps: raise FlowValidationError(f"Flow {flow.name!r} must define at least one step.") def _run_once_all(self, *, collect_results: bool = True) -> list[FlowContext]: results: list[FlowContext] = [] for flow in self.flows: if self.runtime_stop_event is not None and self.runtime_stop_event.is_set(): break jobs: list[QueuedRunJob] = [] for source_path in self.polling.startup_sources(flow): batch_signatures = () trigger = flow.trigger if ( source_path is None and isinstance(trigger, WatchSpec) and trigger.mode == "poll" and trigger.run_as == "batch" and trigger.source is not None and trigger.source.is_dir() ): batch_signatures = self.polling.stale_batch_poll_signatures(flow) jobs.append(QueuedRunJob(flow=flow, source_path=source_path, batch_signatures=batch_signatures)) batch_results = self._run_jobs(jobs, collect_results=collect_results) if collect_results: results.extend(batch_results) return results
[docs] def max_parallel_for_flow(self, flow: "CoreFlow") -> int: """Return the allowed per-flow source concurrency for one flow.""" trigger = flow.trigger if not isinstance(trigger, WatchSpec): return 1 if trigger.run_as != "individual": return 1 return max(int(trigger.max_parallel), 1)
def _run_jobs(self, jobs: list[QueuedRunJob], *, collect_results: bool = True) -> list[FlowContext]: if not jobs: return [] max_parallel = self.max_parallel_for_flow(jobs[0].flow) if max_parallel <= 1 or len(jobs) <= 1: results: list[FlowContext] = [] for job in jobs: if self.runtime_stop_event is not None and self.runtime_stop_event.is_set(): break result = self.run_executor.run_one( job.flow, job.source_path, batch_signatures=job.batch_signatures, ) if collect_results: results.append(result) else: self._release_completed_context(result) return results if not collect_results: with ThreadPoolExecutor(max_workers=min(max_parallel, len(jobs))) as executor: future_to_job: dict[Future[FlowContext], QueuedRunJob] = {} job_iter = iter(jobs) def _submit_next_job() -> bool: if self.runtime_stop_event is not None and self.runtime_stop_event.is_set(): return False try: job = next(job_iter) except StopIteration: return False future = executor.submit(self._execute_job_in_thread, job) future_to_job[future] = job return True for _ in range(min(max_parallel, len(jobs))): if not _submit_next_job(): break try: while future_to_job: future = next(as_completed(tuple(future_to_job))) future_to_job.pop(future, None) result = future.result() self._release_completed_context(result) _submit_next_job() except Exception: for future in future_to_job: future.cancel() raise return [] results_by_index: dict[int, FlowContext] = {} job_iter = iter(enumerate(jobs)) with ThreadPoolExecutor(max_workers=min(max_parallel, len(jobs))) as executor: future_to_index: dict[Future[FlowContext], int] = {} def _submit_next_job() -> bool: if self.runtime_stop_event is not None and self.runtime_stop_event.is_set(): return False try: index, job = next(job_iter) except StopIteration: return False future = executor.submit(self._execute_job_in_thread, job) future_to_index[future] = index return True for _ in range(min(max_parallel, len(jobs))): if not _submit_next_job(): break try: while future_to_index: future = next(as_completed(tuple(future_to_index))) index = future_to_index.pop(future) results_by_index[index] = future.result() _submit_next_job() except Exception: for future in future_to_index: future.cancel() raise return [results_by_index[index] for index in range(len(results_by_index))]
[docs] def dispatch_queued_jobs( self, queue, queued_keys: set[tuple[str, str | None]], pending_futures: dict[Future[FlowContext], tuple[QueuedRunJob, int]], executor: ThreadPoolExecutor, *, results: list[FlowContext] | None, ) -> None: """Submit queued source jobs up to each flow's bounded concurrency and drain completions.""" self._drain_completed_jobs(pending_futures, results=results) if self.runtime_stop_event is not None and self.runtime_stop_event.is_set(): return if queue: queue_length = len(queue) for _ in range(queue_length): job = queue.popleft() key = self.polling.job_key(job.flow, job.source_path) flow_name = job.flow.name active_count = sum(1 for pending_job, _ in pending_futures.values() if pending_job.flow.name == flow_name) if active_count >= self.max_parallel_for_flow(job.flow): queue.append(job) continue queued_keys.discard(key) future = executor.submit(self._execute_job_in_thread, job) results_count = len(results) if results is not None else 0 pending_futures[future] = (job, results_count + len(pending_futures)) self._drain_completed_jobs(pending_futures, results=results)
def _execute_job_in_thread(self, job: QueuedRunJob) -> FlowContext: """Execute one queued job and clean thread-local runtime resources after completion.""" try: return self.run_executor.run_one( job.flow, job.source_path, batch_signatures=job.batch_signatures, ) finally: close_current_thread_connection = getattr(self.runtime_ledger, "close_current_thread_connection", None) if callable(close_current_thread_connection): close_current_thread_connection()
[docs] def wait_for_dispatched_jobs( self, pending_futures: dict[Future[FlowContext], tuple[QueuedRunJob, int]], *, results: list[FlowContext] | None, ) -> None: """Wait for all pending queued jobs to complete.""" while pending_futures: done, _ = wait(tuple(pending_futures), return_when=FIRST_COMPLETED) for future in done: self._consume_completed_future(future, pending_futures, results=results)
def _drain_completed_jobs( self, pending_futures: dict[Future[FlowContext], tuple[QueuedRunJob, int]], *, results: list[FlowContext] | None, ) -> None: done = [future for future in pending_futures if future.done()] for future in done: self._consume_completed_future(future, pending_futures, results=results) def _consume_completed_future( self, future: Future[FlowContext], pending_futures: dict[Future[FlowContext], tuple[QueuedRunJob, int]], *, results: list[FlowContext] | None, ) -> None: pending_futures.pop(future, None) try: result = future.result() if results is not None: results.append(result) else: self._release_completed_context(result) except FlowStoppedError: if self.flow_stop_event is not None: self.flow_stop_event.clear() except Exception: return def _release_completed_context(self, context: FlowContext) -> None: """Drop bulky run-owned references once a completed context is no longer needed.""" release_context_values(context) def _preview_one(self, flow: "CoreFlow", source_path: "Path | None", *, use: str | None) -> FlowContext: return self.run_executor.preview_one(flow, source_path, use=use) def _make_watcher(self, trigger: WatchSpec) -> PollingWatcher: return self.polling.make_watcher(trigger) def _startup_sources(self, flow: "CoreFlow", *, allow_missing: bool = False): return self.polling.startup_sources(flow, allow_missing=allow_missing) def _stale_poll_sources(self, flow: "CoreFlow"): return self.polling.stale_poll_sources(flow) def _stale_batch_poll_signatures(self, flow: "CoreFlow"): return self.polling.stale_batch_poll_signatures(flow) def _is_poll_source_stale(self, flow: "CoreFlow", source_path: "Path | None") -> bool: return self.polling.is_poll_source_stale(flow, source_path) def _poll_source_signature(self, flow: "CoreFlow", source_path: "Path | None"): return self.polling.poll_source_signature(flow, source_path) def _normalized_source_path(self, source_path: "Path | None"): return self.polling.normalized_source_path(source_path)
[docs] def register_run(self, run_id: str) -> None: """Mark one run id as active.""" self.run_stop_controller.register_run(run_id)
[docs] def unregister_run(self, run_id: str) -> None: """Clear active and requested state for one completed run id.""" self.run_stop_controller.unregister_run(run_id)
[docs] def check_run(self, run_id: str | None) -> None: """Raise when runtime-wide or run-id stop has been requested.""" if self.flow_stop_event is not None and self.flow_stop_event.is_set(): append_timing_line( self._timing_log_path, scope="runtime.step", event="stop_check_flow_event", phase="mark", fields={"run_id": run_id, "execution_mode": "continuous" if self.continuous else "oneshot"}, ) raise FlowStoppedError("Flow stop requested by operator.") self.run_stop_controller.check_run(run_id)
def _emit_status(self, message: str) -> None: if self.status_callback is not None: self.status_callback(message)
__all__ = ["FlowRuntime", "RuntimeCacheLedgerService", "default_runtime_cache_ledger_service"]