Flow Methods
This page covers the small author-facing Flow surface.
The method-level reference now lives in the Flow docstrings and is rendered in
the API reference. Keep examples that describe exact parameters, return values,
and validation rules beside the methods in src/data_engine/core/flow.py; that
keeps VS Code hover help and the packaged docs in sync. This page is the
author-facing tour of when to use those methods together.
from data_engine import Flow
Flow(group)
Create a new immutable flow definition.
flow = Flow(group="Claims")
Rules:
groupmust be a non-empty stringthe flow-module filename provides the flow identity
the returned object is immutable, so each fluent call returns a new
Flow
Immutability matters because it keeps authoring predictable. Each chained call produces a new flow definition with explicit state.
watch(...)
Configure a runtime trigger for manual, poll, or schedule execution.
flow = flow.watch(
mode="poll",
source="../../example_data/Input/claims_flat",
interval="5s",
extensions=[".xlsx", ".xlsm"],
settle=1,
)
flow = flow.watch(
mode="poll",
source="../../example_data/Settings/single_watch.xlsx",
interval="5s",
)
flow = flow.watch(mode="schedule", run_as="batch", interval="15m")
flow = flow.watch(mode="schedule", run_as="batch", interval="15m", source="../../example_data/Input/claims_flat")
flow = flow.watch(mode="schedule", run_as="batch", time="10:31", source="../../example_data/Settings/single_watch.xlsx")
flow = flow.watch(mode="schedule", run_as="batch", time=["08:15", "14:45"])
flow = flow.watch(mode="manual")
Rules:
modemust be one ofmanual,poll, orschedulerun_asdefaults toindividualrun_as="individual"means one run per concrete source filerun_as="batch"means one run at the watched rootpoll flows require
source=andinterval=schedule flows accept exactly one of
interval=ortime=timeaccepts either oneHH:MMstring or a collection ofHH:MMstringsextensionsandsettleare poll-only optionsmissing or bad paths fail now and recover later when the path becomes valid
poll freshness compares the current source file signature against the runtime ledger
Practical guidance:
use
manualfor explicit button-driven flowsuse
pollwhen the source changing should be the triggeruse
schedulewhen time should be the triggeruse
run_as="batch"when the flow should reason about a folder or root as one unituse
run_as="individual"when each source file should become its own run
watch(...) is where you describe orchestration intent, not transformation logic.
mirror(root=...)
Bind a mirrored output namespace rooted at one directory.
flow = flow.mirror(root="../../example_data/Output/example_mirror")
mirror(...) defines the output namespace exposed later through context.mirror.
You can omit mirror(...) entirely if the flow has no need for a mirrored output namespace.
step(fn, use=None, save_as=None, label=None)
Add one generic callable step.
flow = flow.step(read_claims, save_as="raw_df")
flow = flow.step(clean_claims, use="raw_df", save_as="clean_df")
flow = flow.step(write_output, use="clean_df", label="Write Parquet")
Rules:
fnmust be callablefnmust accept exactly onecontextparameteruse=selects a previously saved objectsave_as=stores the returned objectlabel=overrides the UI display name
The return value always becomes context.current.
This is the default workhorse method. Most flows are easiest to read when they are mostly made of step(...) with occasional collect(...) and map(...) where batching is truly needed.
map(fn, use=None, save_as=None, label=None)
Map one callable across the current batch.
flow = flow.collect(extensions=[".pdf"])
flow = flow.map(fn=validate_pdf)
flow = flow.map(fn=validate_pdf_with_context, label="Validate Pdf")
def validate_pdf(file_ref):
return {"name": file_ref.name, "ok": file_ref.exists()}
def validate_pdf_with_context(context, file_ref):
return {"flow": context.flow_name, "name": file_ref.name}
Rules:
map()expects the current value to be iterablefnmay accept either(item)or(context, item)the mapped results are returned as a
Batchmap()raises when the current batch is emptyuse=,save_as=, andlabel=work the same way they do forstep()
Reach for map(...) when the same callable should run once per collected item. If the callable should reason about the whole collection, switch back to a normal step(...).
step_each(fn, use=None, save_as=None, label=None)
step_each(...) is an alias for map(...).
Use whichever reads better in the flow module:
flow = flow.map(fn=read_claims)
flow = flow.step_each(fn=read_claims)
collect(extensions, root=None, recursive=False, use=None, save_as=None, label=None)
Collect matching files into a Batch of FileRef items.
flow = flow.collect(extensions=[".xlsx"])
flow = flow.collect(extensions=[".pdf"], recursive=True)
Behavior:
uses
root=when providedotherwise falls back to
context.source.rootreturns a
BatchofFileRefitemseach item exposes
.name,.path,.stem,.suffix, and.parent
If root= is omitted, the runtime falls back to the current source root. That is often the cleanest choice for poll or scheduled batch flows already bound to a source.
run_once()
Run the flow one time and return the completed contexts.
Use this when you want a one-off Python-driven execution.
run()
Start continuous execution for watched poll or schedule flows.
This is the entrypoint behind long-lived runtime behavior.
preview(use=None)
Run one flow for notebook inspection and return a real object.
build().preview()
build().preview(use="raw_df").head(10)
build().preview(use="claim_frames")
Behavior:
without
use=, returns the finalcontext.currentwith
use="name", runs only untilsave_as="name"existsreturns the real saved object, so dataframe methods like
.head(10)work naturallyavoids running later write/debug steps once the requested saved object is available
if a poll flow has several startup source files, preview uses the first deterministic source candidate for notebook inspection
preview(...) is especially useful while authoring notebook-backed flows because it lets you stop at a meaningful intermediate and inspect it directly.
If you want the final one-off result itself, use preview() without use=.