"""Executable flow loading services."""
from __future__ import annotations
from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING
from data_engine.flow_modules.flow_module_loader import discover_flow_module_definitions, load_flow_module_definition
if TYPE_CHECKING:
from data_engine.core.flow import Flow as CoreFlow
def _default_load_flow(name: str, *, data_root: Path | None = None) -> "CoreFlow":
return load_flow_module_definition(name, data_root=data_root).build()
def _default_discover_flows(*, data_root: Path | None = None) -> tuple["CoreFlow", ...]:
return tuple(definition.build() for definition in discover_flow_module_definitions(data_root=data_root))
[docs]
class FlowExecutionService:
"""Own executable flow loading through an explicit loader dependency."""
def __init__(
self,
*,
load_flow_func: Callable[..., "CoreFlow"] = _default_load_flow,
discover_flows_func: Callable[..., tuple["CoreFlow", ...]] = _default_discover_flows,
) -> None:
self._load_flow = load_flow_func
self._discover_flows = discover_flows_func
[docs]
def load_flow(self, name: str, *, workspace_root: Path | None = None) -> "CoreFlow":
"""Return one executable flow definition by name."""
return self._load_flow(name, data_root=workspace_root)
[docs]
def load_flows(self, names: tuple[str, ...], *, workspace_root: Path | None = None) -> tuple["CoreFlow", ...]:
"""Return executable flow definitions for the requested names."""
return tuple(self.load_flow(name, workspace_root=workspace_root) for name in names)
[docs]
def discover_flows(self, *, workspace_root: Path | None = None) -> tuple["CoreFlow", ...]:
"""Return all executable flow definitions for the requested workspace root."""
return self._discover_flows(data_root=workspace_root)
__all__ = ["FlowExecutionService"]