Authoring Flow Modules
Flow modules live in:
workspaces/<workspace_id>/flow_modules/<name>.ipynbworkspaces/<workspace_id>/flow_modules/<name>.py
Reusable helper modules can live in:
workspaces/<workspace_id>/flow_modules/flow_helpers/<name>.py
Each flow module should export:
optional
DESCRIPTIONbuild() -> Flow
The flow-module filename is the durable flow identity used by discovery and runtime state. If you rename the file, you are effectively creating a different flow as far as the system is concerned.
Required contract
from data_engine import Flow
DESCRIPTION = "Reads workbook inputs and writes mirrored parquet outputs."
def build():
return Flow(group="Claims")
In flow modules, the module filename owns the flow identity. Use label= on the returned Flow(...) when you want a custom UI title. When label= is omitted, Data Engine derives a readable label from the flow-module filename automatically.
build() must not accept any parameters.
Keep module import-time code side-effect free. The app needs to discover flows safely and repeatedly, so top-level code should not:
run queries
write files
start background work
depend on interactive state
Do that work inside steps instead.
Step style
Every step(...) callable receives one context argument:
def read_claims(context):
...
def clean_claims(context):
...
map(...) and step_each(...) are the batch-oriented exception. They accept either:
def validate_pdf(file_ref):
...
def validate_pdf_with_context(context, file_ref):
...
map(...) always returns a Batch, and step_each(...) is the equivalent alias. Both raise immediately when the current batch is empty.
Use native libraries directly inside those steps:
Polars for dataframe reads, transforms, and writes
DuckDB for SQL and database work
pathliband normal Python for filesystem logic
That simplicity is the intended authoring experience. Flow modules should feel like normal Python modules with a small orchestration surface.
Good patterns
keep import-time code side-effect free
keep expensive work inside steps
use
save_as=anduse=to preserve intermediate objectsuse
build().preview(use="name")in notebooks when you want to inspect one saved intermediate object quicklyuse
collect(...)when you want a batch of filesuse
map(...)orstep_each(...)when the same callable should run once per batch itemuse
context.sourcefor source-relative pathsuse
context.mirrorfor write-ready output pathsreturn the written
Pathfrom output steps so the UI can enableInspectmove shared parsing, SQL, and utility code into
flow_modules/flow_helpers/*.pyand import it from flows withfrom flow_helpers.<name> import ...
Also good:
use
context.config.require("name")for required TOML configuse
context.database("analytics/db.duckdb")for workspace-local database pathsrecord useful UI/runtime details in
context.metadatakeep writer steps narrow and explicit
split “build data” and “write data” into separate steps when you want a previewable intermediate
Usually worth avoiding:
monolithic steps that read, transform, and write everything at once
hand-built relative path logic when
context.sourceorcontext.mirroralready models ithidden global state in helper modules
returning a path that was never actually written
Helper modules
Helper modules are regular Python files under flow_modules/flow_helpers/. They are compiled into workspace-local runtime artifacts and are importable from both notebook-authored and Python-authored flows.
Example:
# flow_modules/flow_helpers/claims_sql.py
LATEST_CLAIMS_SQL = "select * from claims where is_latest = true"
# flow_modules/claims_report.py
from flow_helpers.claims_sql import LATEST_CLAIMS_SQL
from data_engine import Flow
def build():
return Flow(group="Claims")
Files in flow_modules/flow_helpers/ support authored flow modules and stay out of runnable flow discovery.
Helper imports are resolved against the currently selected workspace during flow loading. That isolation matters when two workspaces use the same helper module names, because one workspace’s helper cache should never leak into another workspace’s flow import.
Flow-module compilation is content-aware. If you save a source file twice in quick succession on a filesystem with coarse mtimes, Data Engine still recompiles when the rendered module text changed.
This is the right home for:
shared SQL strings
parsing helpers
file naming utilities
common dataframe transforms
shared constants
Code that should run independently and appear in the app belongs in its own flow module with its own build().
Example
from data_engine import Flow
import polars as pl
def read_claims(file_ref):
return pl.read_excel(file_ref.path)
def concat_claims(context):
return pl.concat(context.current, how="vertical_relaxed")
def keep_completed(context):
return context.current.filter(pl.col("Step TO") == "COMPLETED")
def write_target(context):
output = context.mirror.file("example_completed.parquet")
context.current.write_parquet(output)
return output
def build():
return (
Flow(group="Claims")
.watch(mode="schedule", run_as="batch", interval="15m", source="../../example_data/Input/claims_flat")
.mirror(root="../../example_data/Output/example_completed")
.collect([".xlsx"], save_as="claim_files")
.map(read_claims, use="claim_files", save_as="claim_frames")
.step(concat_claims, use="claim_frames", save_as="raw_df")
.step(keep_completed, use="raw_df", save_as="clean_df")
.step(write_target, use="clean_df")
)
That example shows map(...) in context:
collect(...)gathers a batch ofFileRefitemsmap(...)reads each file into one dataframelater
step(...)callables operate on the whole batch result
There is no separate config layer that turns one flow module into multiple named flow variants after build time.
Notebook-authored vs Python-authored modules
Both notebook and Python flow modules participate in the same discovery model:
they export one
build() -> Flowthey can import helper modules
they compile into runtime-ready Python modules
Python modules are usually better for:
shared flows
helper-heavy logic
larger code review surfaces
Notebooks are usually better for:
exploratory authoring
iterative preview-driven development
flows that benefit from inline inspection while being built
A practical authoring checklist
Before calling a flow module “done,” it is worth checking:
build()returns oneFlowthe module imports cleanly with no side effects
the step labels are readable in the UI
saved object names are meaningful
required config is documented or obvious
writer steps return actual existing paths when you want inspectability
any helper modules sit under
flow_modules/flow_helpers/