Source code for data_engine.hosts.scheduler

"""APScheduler-backed host for scheduled flow execution."""

from __future__ import annotations

from dataclasses import dataclass
from threading import Event, Lock
from typing import TYPE_CHECKING, Protocol

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger

from data_engine.core.primitives import WatchSpec
from data_engine.runtime.engine import RuntimeEngine

if TYPE_CHECKING:
    from data_engine.core.flow import Flow


[docs] class SchedulerPort(Protocol): """Small scheduler surface used by the scheduler host."""
[docs] def add_job(self, func, *, trigger, id: str, replace_existing: bool = False, max_instances: int = 1): """Add or replace one scheduled job."""
[docs] def remove_job(self, job_id: str) -> None: """Remove one scheduled job by id."""
[docs] def start(self) -> None: """Start the scheduler."""
[docs] def shutdown(self, wait: bool = True) -> None: """Stop the scheduler."""
[docs] @dataclass(frozen=True) class ScheduledFlowJob: """Description of one scheduler job owned by the host.""" job_id: str flow_name: str trigger_kind: str
[docs] class SchedulerHost: """Own APScheduler timing while delegating flow meaning to the runtime engine.""" def __init__( self, *, runtime_engine: RuntimeEngine | None = None, scheduler: SchedulerPort | None = None, job_id_prefix: str = "data-engine:schedule:", ) -> None: self.runtime_engine = runtime_engine or RuntimeEngine() self.scheduler = scheduler or BackgroundScheduler() self.job_id_prefix = job_id_prefix self._lock = Lock() self._job_ids: set[str] = set()
[docs] def rebuild_jobs(self, flows: tuple["Flow", ...]) -> tuple[ScheduledFlowJob, ...]: """Replace scheduler jobs from discovered scheduled flows.""" with self._lock: self._remove_known_jobs() jobs: list[ScheduledFlowJob] = [] for flow in flows: jobs.extend(self._add_flow_jobs(flow)) self._job_ids = {job.job_id for job in jobs} return tuple(jobs)
[docs] def start(self) -> None: """Start the underlying scheduler.""" self.scheduler.start()
[docs] def shutdown(self, *, wait: bool = True) -> None: """Stop the underlying scheduler.""" self.scheduler.shutdown(wait=wait)
[docs] def run_until_stopped(self, flows: tuple["Flow", ...], stop_event: Event) -> tuple[ScheduledFlowJob, ...]: """Run scheduled flow jobs until ``stop_event`` is set.""" jobs = self.rebuild_jobs(flows) if not jobs: return jobs try: self.start() stop_event.wait() finally: self.shutdown() return jobs
def _remove_known_jobs(self) -> None: for job_id in tuple(self._job_ids): try: self.scheduler.remove_job(job_id) except Exception: continue self._job_ids.clear() def _add_flow_jobs(self, flow: "Flow") -> tuple[ScheduledFlowJob, ...]: trigger = flow.trigger if not isinstance(trigger, WatchSpec) or trigger.mode != "schedule": return () if trigger.interval_seconds is not None: job_id = self._job_id(flow, "interval") self.scheduler.add_job( self._run_flow, trigger=IntervalTrigger(seconds=float(trigger.interval_seconds)), id=job_id, replace_existing=True, max_instances=1, args=(flow,), ) return (ScheduledFlowJob(job_id=job_id, flow_name=flow.name, trigger_kind="interval"),) jobs: list[ScheduledFlowJob] = [] for hour, minute in trigger.time_slots: job_id = self._job_id(flow, f"daily-{hour:02d}-{minute:02d}") self.scheduler.add_job( self._run_flow, trigger=CronTrigger(hour=hour, minute=minute), id=job_id, replace_existing=True, max_instances=1, args=(flow,), ) jobs.append(ScheduledFlowJob(job_id=job_id, flow_name=flow.name, trigger_kind="daily")) return tuple(jobs) def _run_flow(self, flow: "Flow") -> object: return self.runtime_engine.run_once(flow) def _job_id(self, flow: "Flow", suffix: str) -> str: return f"{self.job_id_prefix}{flow.name}:{suffix}"
__all__ = ["ScheduledFlowJob", "SchedulerHost", "SchedulerPort"]