"""Flow catalog loading services."""
from __future__ import annotations
from collections.abc import Callable
from pathlib import Path
from data_engine.core.flow import Flow
from data_engine.core.helpers import _title_case_words
from data_engine.domain import FlowCatalogEntry, default_flow_state, flow_category
from data_engine.core.model import FlowValidationError
from data_engine.flow_modules.flow_module_loader import FlowModuleDefinition, discover_flow_module_definitions
from data_engine.platform.paths import path_display
def _flow_paths(flow: Flow) -> tuple[str, str]:
trigger = flow.trigger
source = getattr(trigger, "source", None) if trigger is not None else None
target = getattr(flow.mirror_spec, "root", None)
return path_display(source), path_display(target)
def _flow_interval(flow: Flow) -> str:
trigger = flow.trigger
if trigger is None:
return "-"
if getattr(trigger, "interval", None) is not None:
return str(trigger.interval)
times = getattr(trigger, "times", ())
if times:
return ", ".join(str(value) for value in times)
if getattr(trigger, "time", None) is not None:
return str(trigger.time)
return "-"
def _flow_settle(flow: Flow) -> str:
trigger = flow.trigger
if trigger is None or getattr(trigger, "mode", None) != "poll":
return "-"
return str(getattr(trigger, "settle", 1))
def _flow_parallelism(flow: Flow) -> str:
trigger = flow.trigger
if trigger is None:
return "1"
return str(max(int(getattr(trigger, "max_parallel", 1)), 1))
[docs]
def flow_catalog_entry_from_flow(flow: Flow, *, description: str | None) -> FlowCatalogEntry:
source_root, target_root = _flow_paths(flow)
operation_items = tuple(step.label for step in flow.steps)
operations = " -> ".join(operation_items) or "(no steps)"
mode = flow.mode
derived_title = flow.label or _title_case_words(flow.name or "", empty="Flow")
return FlowCatalogEntry(
name=flow.name,
group=flow.group,
title=derived_title,
description=description or "",
source_root=source_root,
target_root=target_root,
mode=mode,
interval=_flow_interval(flow),
settle=_flow_settle(flow),
parallelism=_flow_parallelism(flow),
operations=operations,
operation_items=operation_items,
state=default_flow_state(mode),
valid=True,
category=flow_category(mode),
)
def _invalid_entry(definition: FlowModuleDefinition, exc: Exception) -> FlowCatalogEntry:
return FlowCatalogEntry(
name=definition.name,
group=None,
title=_title_case_words(definition.name, empty="Flow"),
description=definition.description or "",
source_root="(not set)",
target_root="(not set)",
mode="manual",
interval="-",
settle="-",
parallelism="1",
operations="Unavailable",
operation_items=(),
state="invalid",
valid=False,
category="manual",
error=str(exc),
)
[docs]
class FlowCatalogService:
"""Own flow catalog loading through an explicit discovery dependency."""
def __init__(
self,
*,
discover_definitions_func: Callable[..., tuple[FlowModuleDefinition, ...]] = discover_flow_module_definitions,
) -> None:
self._discover_definitions = discover_definitions_func
[docs]
def load_entries(self, *, workspace_root: Path | None = None) -> tuple[FlowCatalogEntry, ...]:
"""Return discovered flow catalog entries for the requested workspace root."""
entries: list[FlowCatalogEntry] = []
definitions = self._discover_definitions(data_root=workspace_root)
if not definitions:
raise FlowValidationError("No flow modules discovered.")
for definition in definitions:
try:
entries.append(flow_catalog_entry_from_flow(definition.build(), description=definition.description))
except Exception as exc:
entries.append(_invalid_entry(definition, exc))
return tuple(sorted(entries, key=lambda entry: entry.name))
__all__ = ["FlowCatalogService", "flow_catalog_entry_from_flow"]