Configuring Flows
Per-flow configuration lives in the fluent Flow chain, not in TOML.
That is an important design choice:
the runtime shape of a flow belongs in the authored
Flow(...)definitionworkspace-local TOML in
config/is for step logic and runtime parameters consumed by your codethere is no separate “expand this flow into several configured variants” layer after
build()
Core fields
Flow(group="Claims")
group is author-defined. The flow-module filename provides the flow identity.
Use group to cluster related flows in the UI and runtime model. A good rule of thumb is that a group should mean “these flows belong to the same operator-facing area of work.”
Watching
Single-file polling:
Flow(group="Settings").watch(
mode="poll",
source="../../example_data/Settings/single_watch.xlsx",
interval="5s",
).mirror(
root="../../example_data/Output/example_single_watch",
)
Directory polling:
Flow(group="Claims").watch(
mode="poll",
source="../../example_data/Input/claims_flat",
interval="5s",
extensions=[".xlsx", ".xls", ".xlsm"],
settle=1,
).mirror(
root="../../example_data/Output/example_mirror",
)
Scheduled batch runs:
Flow(group="Analytics").watch(
mode="schedule",
run_as="batch",
interval="15m",
source="../../example_data/Input/claims_flat",
).mirror(
root="../../example_data/Output/example_summary",
)
Flow(group="Settings").watch(
mode="schedule",
run_as="batch",
time="10:31",
source="../../example_data/Settings/single_watch.xlsx",
).mirror(
root="../../example_data/Output/example_schedule",
)
Flow(group="Settings").watch(
mode="schedule",
run_as="batch",
time=["08:15", "14:45"],
source="../../example_data/Settings/single_watch.xlsx",
)
What watching owns:
source selection
ledger-backed source freshness tracking
extension filtering for poll directory sources
settle/debounce behavior for poll flows
whether runtime executes per file or as one root-level batch via
run_as=
What watching leaves to step code:
dataframe reads
dataframe transforms
database work
output writing
That separation keeps watch(...) readable. It tells the engine when and why to run while step code handles the underlying data work.
watch(mode="schedule", ...) accepts exactly one of:
interval="10m"time="HH:MM"time=["08:15", "14:45"]
It may also bind an optional source=... path for recurring jobs.
run_as
run_as controls what the runtime treats as one unit of work.
Common values are:
run_as="individual": one run per concrete source filerun_as="batch": one run at the watched root
Use individual when each source file should be processed independently.
Use batch when the flow should reason about the watched source as one collection, such as “all current workbooks in this folder.”
Poll-specific options
extensions= limits which files in a polled directory participate in freshness checks and execution.
settle= adds debounce behavior so the engine reacts after a file has had time to finish writing.
Mirror bindings
Use mirror(root=...) when a flow needs source-relative output routing.
Inside steps:
context.mirror.with_suffix(".parquet")
context.mirror.file("summary.json")
context.mirror.namespaced_file("open_claims.parquet")
context.mirror.root_file("analytics.duckdb")
mirror(...) defines the output namespace available at runtime.
Use mirror(...) when the flow has natural mirrored outputs.
If a flow writes several related outputs, mirror(...) is usually the cleanest way to keep them organized without scattering path math through your steps.
Batch workflows
Use collect(...) and map(...) or step_each(...) together for folder-oriented processing:
Flow(group="Analytics") \
.watch(mode="schedule", run_as="batch", interval="15m", source="../../example_data/Input/claims_flat") \
.collect([".xlsx"], save_as="claim_files") \
.map(read_claims, use="claim_files", save_as="claim_frames")
map(...) is the per-item stage in that pipeline, and step_each(...) is the equivalent alias. Both raise immediately when the batch is empty.
This is the standard batch shape:
watch a directory or scheduled source root
collect matching files into a
Batchmap one callable across each file
switch back to
step(...)once you want to reason about the combined result
Configuring step labels and saved objects
Flow configuration also includes the names and labels you assign in the chain.
Examples:
(
Flow(group="Claims")
.step(read_claims, save_as="raw_df")
.step(clean_claims, use="raw_df", save_as="clean_df")
.step(write_output, use="clean_df", label="Write Parquet")
)
Those fields affect the authoring experience directly:
save_as=creates stable names for later steps and notebook previewsuse=loads one of those saved names intocontext.currentlabel=controls the display name in the UI
If you are deciding where a piece of information belongs:
if it shapes orchestration, put it in the
Flowchainif it shapes step logic, put it in your code or in
context.config