Source code for data_engine.services.flow_execution

"""Executable flow loading services."""

from __future__ import annotations

from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING

from data_engine.flow_modules.flow_module_loader import discover_flow_module_definitions, load_flow_module_definition

if TYPE_CHECKING:
    from data_engine.core.flow import Flow as CoreFlow


def _default_load_flow(name: str, *, data_root: Path | None = None) -> "CoreFlow":
    return load_flow_module_definition(name, data_root=data_root).build()


def _default_discover_flows(*, data_root: Path | None = None) -> tuple["CoreFlow", ...]:
    return tuple(definition.build() for definition in discover_flow_module_definitions(data_root=data_root))


[docs] class FlowExecutionService: """Own executable flow loading through an explicit loader dependency.""" def __init__( self, *, load_flow_func: Callable[..., "CoreFlow"] = _default_load_flow, discover_flows_func: Callable[..., tuple["CoreFlow", ...]] = _default_discover_flows, ) -> None: self._load_flow = load_flow_func self._discover_flows = discover_flows_func
[docs] def load_flow(self, name: str, *, workspace_root: Path | None = None) -> "CoreFlow": """Return one executable flow definition by name.""" return self._load_flow(name, data_root=workspace_root)
[docs] def load_flows(self, names: tuple[str, ...], *, workspace_root: Path | None = None) -> tuple["CoreFlow", ...]: """Return executable flow definitions for the requested names.""" return tuple(self.load_flow(name, workspace_root=workspace_root) for name in names)
[docs] def discover_flows(self, *, workspace_root: Path | None = None) -> tuple["CoreFlow", ...]: """Return all executable flow definitions for the requested workspace root.""" return self._discover_flows(data_root=workspace_root)
__all__ = ["FlowExecutionService"]