Flow Methods

This page covers the small author-facing Flow surface.

The method-level reference now lives in the Flow docstrings and is rendered in the API reference. Keep examples that describe exact parameters, return values, and validation rules beside the methods in src/data_engine/core/flow.py; that keeps VS Code hover help and the packaged docs in sync. This page is the author-facing tour of when to use those methods together.

from data_engine import Flow

Flow(group)

Create a new immutable flow definition.

flow = Flow(group="Claims")

Rules:

  • group must be a non-empty string

  • the flow-module filename provides the flow identity

  • the returned object is immutable, so each fluent call returns a new Flow

Immutability matters because it keeps authoring predictable. Each chained call produces a new flow definition with explicit state.

watch(...)

Configure a runtime trigger for manual, poll, or schedule execution.

flow = flow.watch(
    mode="poll",
    source="../../example_data/Input/claims_flat",
    interval="5s",
    extensions=[".xlsx", ".xlsm"],
    settle=1,
)
flow = flow.watch(
    mode="poll",
    source="../../example_data/Settings/single_watch.xlsx",
    interval="5s",
)
flow = flow.watch(mode="schedule", run_as="batch", interval="15m")
flow = flow.watch(mode="schedule", run_as="batch", interval="15m", source="../../example_data/Input/claims_flat")
flow = flow.watch(mode="schedule", run_as="batch", time="10:31", source="../../example_data/Settings/single_watch.xlsx")
flow = flow.watch(mode="schedule", run_as="batch", time=["08:15", "14:45"])
flow = flow.watch(mode="manual")

Rules:

  • mode must be one of manual, poll, or schedule

  • run_as defaults to individual

  • run_as="individual" means one run per concrete source file

  • run_as="batch" means one run at the watched root

  • poll flows require source= and interval=

  • schedule flows accept exactly one of interval= or time=

  • time accepts either one HH:MM string or a collection of HH:MM strings

  • extensions and settle are poll-only options

  • missing or bad paths fail now and recover later when the path becomes valid

  • poll freshness compares the current source file signature against the runtime ledger

Practical guidance:

  • use manual for explicit button-driven flows

  • use poll when the source changing should be the trigger

  • use schedule when time should be the trigger

  • use run_as="batch" when the flow should reason about a folder or root as one unit

  • use run_as="individual" when each source file should become its own run

watch(...) is where you describe orchestration intent, not transformation logic.

mirror(root=...)

Bind a mirrored output namespace rooted at one directory.

flow = flow.mirror(root="../../example_data/Output/example_mirror")

mirror(...) defines the output namespace exposed later through context.mirror.

You can omit mirror(...) entirely if the flow has no need for a mirrored output namespace.

step(fn, use=None, save_as=None, label=None)

Add one generic callable step.

flow = flow.step(read_claims, save_as="raw_df")
flow = flow.step(clean_claims, use="raw_df", save_as="clean_df")
flow = flow.step(write_output, use="clean_df", label="Write Parquet")

Rules:

  • fn must be callable

  • fn must accept exactly one context parameter

  • use= selects a previously saved object

  • save_as= stores the returned object

  • label= overrides the UI display name

The return value always becomes context.current.

This is the default workhorse method. Most flows are easiest to read when they are mostly made of step(...) with occasional collect(...) and map(...) where batching is truly needed.

map(fn, use=None, save_as=None, label=None)

Map one callable across the current batch.

flow = flow.collect(extensions=[".pdf"])
flow = flow.map(fn=validate_pdf)
flow = flow.map(fn=validate_pdf_with_context, label="Validate Pdf")
def validate_pdf(file_ref):
    return {"name": file_ref.name, "ok": file_ref.exists()}


def validate_pdf_with_context(context, file_ref):
    return {"flow": context.flow_name, "name": file_ref.name}

Rules:

  • map() expects the current value to be iterable

  • fn may accept either (item) or (context, item)

  • the mapped results are returned as a Batch

  • map() raises when the current batch is empty

  • use=, save_as=, and label= work the same way they do for step()

Reach for map(...) when the same callable should run once per collected item. If the callable should reason about the whole collection, switch back to a normal step(...).

step_each(fn, use=None, save_as=None, label=None)

step_each(...) is an alias for map(...).

Use whichever reads better in the flow module:

flow = flow.map(fn=read_claims)
flow = flow.step_each(fn=read_claims)

collect(extensions, root=None, recursive=False, use=None, save_as=None, label=None)

Collect matching files into a Batch of FileRef items.

flow = flow.collect(extensions=[".xlsx"])
flow = flow.collect(extensions=[".pdf"], recursive=True)

Behavior:

  • uses root= when provided

  • otherwise falls back to context.source.root

  • returns a Batch of FileRef items

  • each item exposes .name, .path, .stem, .suffix, and .parent

If root= is omitted, the runtime falls back to the current source root. That is often the cleanest choice for poll or scheduled batch flows already bound to a source.

run_once()

Run the flow one time and return the completed contexts.

Use this when you want a one-off Python-driven execution.

run()

Start continuous execution for watched poll or schedule flows.

This is the entrypoint behind long-lived runtime behavior.

preview(use=None)

Run one flow for notebook inspection and return a real object.

build().preview()
build().preview(use="raw_df").head(10)
build().preview(use="claim_frames")

Behavior:

  • without use=, returns the final context.current

  • with use="name", runs only until save_as="name" exists

  • returns the real saved object, so dataframe methods like .head(10) work naturally

  • avoids running later write/debug steps once the requested saved object is available

  • if a poll flow has several startup source files, preview uses the first deterministic source candidate for notebook inspection

preview(...) is especially useful while authoring notebook-backed flows because it lets you stop at a meaningful intermediate and inspect it directly. If you want the final one-off result itself, use preview() without use=.