"""Pure flow definitions and fluent builder methods."""
from __future__ import annotations
from dataclasses import dataclass, replace
import inspect
from pathlib import Path
from typing import Callable
from data_engine.core.helpers import (
_callable_identifier,
_callable_name,
_normalize_extensions,
_normalize_watch_times,
_parse_duration,
_parse_schedule_at,
_resolve_flow_path,
_validate_label,
_validate_slot_name,
)
from data_engine.core.model import FlowValidationError
from data_engine.core.primitives import Batch, FlowContext, MirrorSpec, StepSpec, WatchSpec, collect_files
[docs]
@dataclass(frozen=True)
class Flow:
"""Immutable fluent builder for generic runtime flows.
``Flow`` is the authoring object most flow modules return. Builder methods
never mutate the existing instance; each method returns a new flow so chained
definitions stay predictable and easy to review.
Examples
--------
.. code-block:: python
from data_engine import Flow
flow = (
Flow(group="Claims")
.watch(mode="poll", source="incoming", interval="30s", extensions=[".xlsx"])
.mirror(root="processed")
)
assert flow.mode == "poll"
Attributes
----------
group : str
Display group used by the GUI, TUI, and runtime summaries.
name : str | None
Stable flow identifier. When omitted in a flow module, the module loader
can derive it from the module name.
label : str | None
Optional human-readable display label.
trigger : WatchSpec | None
Runtime trigger configuration.
mirror_spec : MirrorSpec | None
Mirrored output path configuration.
steps : tuple[StepSpec, ...]
Ordered callable steps to run.
Notes
-----
Builder methods return a new ``Flow`` instance, so calls can be chained.
"""
group: str
name: str | None = None
label: str | None = None
trigger: WatchSpec | None = None
mirror_spec: MirrorSpec | None = None
steps: tuple[StepSpec, ...] = ()
_workspace_root: Path | None = None
def __post_init__(self) -> None:
if self.name is not None and (not isinstance(self.name, str) or not self.name.strip()):
raise FlowValidationError("Flow name must be a non-empty string when provided.")
if self.label is not None and (not isinstance(self.label, str) or not self.label.strip()):
raise FlowValidationError("Flow label must be a non-empty string when provided.")
if not isinstance(self.group, str) or not self.group.strip():
raise FlowValidationError("Flow group must be a non-empty string.")
def _clone(self, **kwargs) -> "Flow":
return replace(self, **kwargs)
def _append(self, step: StepSpec) -> "Flow":
return self._clone(steps=(*self.steps, step))
[docs]
def watch(
self,
*,
mode: str,
run_as: str = "individual",
max_parallel: int = 1,
source: str | Path | None = None,
interval: str | None = None,
time: str | tuple[str, ...] | list[str] | set[str] | None = None,
extensions: tuple[str, ...] | list[str] | set[str] | None = None,
settle: int = 1,
) -> "Flow":
"""Configure how this flow is triggered.
Use ``manual`` for explicit operator-driven runs, ``poll`` when source
file changes should trigger work, and ``schedule`` when time should
trigger work. ``run_as="individual"`` runs once per concrete source
file; ``run_as="batch"`` runs once for the watched source root.
Examples
--------
.. code-block:: python
from data_engine import Flow
manual_flow = Flow(group="Claims").watch(mode="manual")
poll_flow = Flow(group="Claims").watch(mode="poll", source="incoming", interval="5s")
schedule_flow = Flow(group="Claims").watch(mode="schedule", interval="15m")
assert manual_flow.mode == "manual"
assert poll_flow.mode == "poll"
assert schedule_flow.mode == "schedule"
Parameters
----------
mode : str
Trigger mode: ``"manual"``, ``"poll"``, or ``"schedule"``.
run_as : str
``"individual"`` to run once per source file, or ``"batch"`` to run
once for the full source set.
max_parallel : int
Maximum number of concurrent source-file runs for one flow when
``run_as="individual"``. Defaults to ``1``.
source : str | Path | None
File or directory watched by poll/schedule triggers.
interval : str | None
Duration string such as ``"10s"`` or ``"5m"`` for poll intervals or
recurring schedules.
time : str | tuple[str, ...] | list[str] | set[str] | None
One or more ``HH:MM`` daily schedule times.
extensions : tuple[str, ...] | list[str] | set[str] | None
Optional file extensions used when discovering source files.
settle : int
Polling settle window in seconds before a changed file is queued.
Returns
-------
Flow
A new flow with the trigger configuration attached.
Raises
------
FlowValidationError
If the trigger mode, source, schedule, or polling options are
inconsistent.
"""
normalized_mode = str(mode).strip().lower()
if normalized_mode not in {"manual", "poll", "schedule"}:
raise FlowValidationError("watch() mode must be one of 'manual', 'poll', or 'schedule'.")
normalized_run_as = str(run_as).strip().lower()
if normalized_run_as not in {"individual", "batch"}:
raise FlowValidationError("watch() run_as must be either 'individual' or 'batch'.")
if not isinstance(max_parallel, int) or max_parallel <= 0:
raise FlowValidationError("watch() max_parallel must be an integer greater than or equal to one.")
if not isinstance(settle, int) or settle < 0:
raise FlowValidationError("watch() settle must be an integer greater than or equal to zero.")
resolved_source = _resolve_flow_path(source) if source is not None else None
normalized_extensions = _normalize_extensions(extensions)
if normalized_mode == "manual":
if interval is not None or time is not None:
raise FlowValidationError("watch(mode='manual') does not accept interval= or time=.")
if settle != 1:
raise FlowValidationError("watch(mode='manual') does not accept settle=.")
return self._clone(
trigger=WatchSpec(
mode="manual",
run_as=normalized_run_as,
max_parallel=max_parallel,
source=resolved_source,
extensions=normalized_extensions,
)
)
if normalized_mode == "poll":
if resolved_source is None:
raise FlowValidationError("watch(mode='poll') requires source=.")
if interval is None:
raise FlowValidationError("watch(mode='poll') requires interval=.")
if time is not None:
raise FlowValidationError("watch(mode='poll') does not accept time=.")
return self._clone(
trigger=WatchSpec(
mode="poll",
run_as=normalized_run_as,
max_parallel=max_parallel,
source=resolved_source,
interval=interval,
interval_seconds=_parse_duration(interval),
extensions=normalized_extensions,
settle=settle,
)
)
if (interval is None) == (time is None):
raise FlowValidationError("watch(mode='schedule') accepts exactly one of interval= or time=.")
if settle != 1:
raise FlowValidationError("watch(mode='schedule') does not accept settle=.")
if interval is not None:
return self._clone(
trigger=WatchSpec(
mode="schedule",
run_as=normalized_run_as,
max_parallel=max_parallel,
source=resolved_source,
interval=interval,
interval_seconds=_parse_duration(interval),
extensions=normalized_extensions,
)
)
assert time is not None
time_values = _normalize_watch_times(time)
return self._clone(
trigger=WatchSpec(
mode="schedule",
run_as=normalized_run_as,
max_parallel=max_parallel,
source=resolved_source,
time=time_values[0] if len(time_values) == 1 else time_values,
times=time_values,
time_slots=tuple(_parse_schedule_at(value) for value in time_values),
extensions=normalized_extensions,
)
)
[docs]
def mirror(self, *, root: str | Path) -> "Flow":
"""Bind a mirrored output namespace rooted at one directory.
``mirror`` only configures paths. It does not write files by itself; the
runtime later exposes write-ready paths through ``context.mirror``.
Examples
--------
.. code-block:: python
from data_engine import Flow
flow = Flow(group="Claims").mirror(root="processed")
assert str(flow.mirror_spec.root).endswith("processed")
Parameters
----------
root : str | Path
Directory used by ``context.mirror`` helpers when writing outputs.
Returns
-------
Flow
A new flow with mirror output helpers enabled.
"""
return self._clone(mirror_spec=MirrorSpec(root=_resolve_flow_path(root)))
[docs]
def step(
self,
fn: Callable[[FlowContext], object],
*,
use: str | None = None,
save_as: str | None = None,
label: str | None = None,
) -> "Flow":
"""Append one callable step to the flow.
``step`` is the default workhorse for flow authoring. The callable
receives one ``FlowContext`` and its return value becomes the next
``context.current``. Use ``save_as`` to keep an intermediate result under
``context.objects`` and ``use`` to load a saved object before a later
step runs.
Examples
--------
.. code-block:: python
from data_engine import Flow
def read_claims(context):
return context.current
flow = Flow(group="Claims").step(read_claims, save_as="raw_df")
assert flow.steps[0].save_as == "raw_df"
Parameters
----------
fn : Callable[[FlowContext], object]
Callable that accepts a single ``FlowContext`` and returns the next
value for ``context.current``.
use : str | None
Optional named object slot to load into ``context.current`` before
the step runs.
save_as : str | None
Optional named object slot to store the step result in.
label : str | None
Optional display label for logs and UI step summaries.
Returns
-------
Flow
A new flow with the step appended.
Raises
------
FlowValidationError
If ``fn`` is not callable or does not accept exactly one argument.
"""
if not callable(fn):
raise FlowValidationError("step() fn must be callable")
normalized_use = _validate_slot_name(method_name="step", slot_name="use", value=use)
normalized_save_as = _validate_slot_name(method_name="step", slot_name="save_as", value=save_as)
normalized_label = _validate_label(method_name="step", label=label)
signature = inspect.signature(fn)
if len(signature.parameters) != 1:
raise FlowValidationError("step() callables must accept exactly one context parameter.")
return self._append(
StepSpec(
fn=fn,
use=normalized_use,
save_as=normalized_save_as,
label=normalized_label or _callable_name(fn),
function_name=_callable_identifier(fn),
)
)
[docs]
def map(
self,
fn: Callable[..., object],
*,
use: str | None = None,
save_as: str | None = None,
label: str | None = None,
) -> "Flow":
"""Append a step that maps a callable over the current iterable value.
``map`` is best when the same callable should run once per collected
item. The callable may accept either ``item`` or ``context, item`` and
the mapped values are returned as a ``Batch``.
Examples
--------
.. code-block:: python
from data_engine import Flow
def summarize(file_ref):
return file_ref.name
flow = Flow(group="Claims").collect(extensions=[".xlsx"]).map(fn=summarize)
assert len(flow.steps) == 2
Parameters
----------
fn : Callable[..., object]
Callable accepting either ``item`` or ``context, item``.
use : str | None
Optional named object slot to map instead of the current value.
save_as : str | None
Optional named object slot to store the mapped ``Batch`` result in.
label : str | None
Optional display label for logs and UI step summaries.
Returns
-------
Flow
A new flow with the mapping step appended.
Raises
------
FlowValidationError
If ``fn`` is not callable, has an unsupported signature, or the
mapped current value is not iterable.
"""
if not callable(fn):
raise FlowValidationError("map() fn must be callable")
normalized_use = _validate_slot_name(method_name="map", slot_name="use", value=use)
normalized_save_as = _validate_slot_name(method_name="map", slot_name="save_as", value=save_as)
normalized_label = _validate_label(method_name="map", label=label)
signature = inspect.signature(fn)
parameter_count = len(signature.parameters)
if parameter_count not in {1, 2}:
raise FlowValidationError("map() callables must accept either (item) or (context, item).")
def _run_each(context: FlowContext):
current = context.current
if isinstance(current, Batch):
items = current.items
elif current is None or isinstance(current, (str, bytes, dict)):
raise FlowValidationError("map() requires an iterable current value.")
else:
try:
items = tuple(current)
except TypeError as exc:
raise FlowValidationError("map() requires an iterable current value.") from exc
if not items:
raise FlowValidationError("map() requires at least one item.")
if parameter_count == 1:
return Batch(tuple(fn(item) for item in items))
return Batch(tuple(fn(context, item) for item in items))
return self._append(
StepSpec(
fn=_run_each,
use=normalized_use,
save_as=normalized_save_as,
label=normalized_label or _callable_name(fn),
function_name=_callable_identifier(fn),
)
)
[docs]
def collect(
self,
extensions: tuple[str, ...] | list[str] | set[str],
*,
root: str | Path | None = None,
recursive: bool = False,
use: str | None = None,
save_as: str | None = None,
label: str | None = None,
) -> "Flow":
"""Append a step that collects source files into a ``Batch``.
If ``root`` is omitted, collection uses the active source root from the
runtime context. This keeps scheduled or poll-driven batch flows concise
because the watched source directory can also be the collection root.
Examples
--------
.. code-block:: python
from data_engine import Flow
flow = Flow(group="Docs").collect(extensions=[".pdf"], recursive=True)
assert flow.steps[0].label == "Collect Files"
Parameters
----------
extensions : tuple[str, ...] | list[str] | set[str]
File extensions to include, such as ``(".xlsx", ".csv")``.
root : str | Path | None
Optional search root. Defaults to the active source root.
recursive : bool
Whether to search child directories.
use : str | None
Optional named object slot to load before collecting.
save_as : str | None
Optional named object slot to store the collected batch in.
label : str | None
Optional display label for logs and UI step summaries.
Returns
-------
Flow
A new flow with the collection step appended.
"""
normalized_use = _validate_slot_name(method_name="collect", slot_name="use", value=use)
normalized_save_as = _validate_slot_name(method_name="collect", slot_name="save_as", value=save_as)
normalized_label = _validate_label(method_name="collect", label=label)
return self.step(
collect_files(extensions, root=root, recursive=recursive),
use=normalized_use,
save_as=normalized_save_as,
label=normalized_label or "Collect Files",
)
[docs]
def step_each(
self,
fn: Callable[..., object],
*,
use: str | None = None,
save_as: str | None = None,
label: str | None = None,
) -> "Flow":
"""Alias for ``map`` that reads naturally in step chains.
Parameters
----------
fn : Callable[..., object]
Callable accepting either ``item`` or ``context, item``.
use : str | None
Optional named object slot to map instead of the current value.
save_as : str | None
Optional named object slot to store the mapped ``Batch`` result in.
label : str | None
Optional display label for logs and UI step summaries.
Returns
-------
Flow
A new flow with the per-item step appended.
"""
return self.map(fn, use=use, save_as=save_as, label=label)
@property
def mode(self) -> str:
"""Return the trigger mode or ``manual`` for unconfigured flows."""
if isinstance(self.trigger, WatchSpec):
return self.trigger.mode
return "manual"
__all__ = ["Flow"]