Recipes
This page collects complete end-to-end examples.
When a recipe matches a shipped starter flow, the starter flow name is called out explicitly.
Recipe: Mirror every workbook
Starter flow: example_mirror
from data_engine import Flow
import polars as pl
def read_claims(context):
return pl.read_excel(context.source.path)
def write_target(context):
output = context.mirror.with_suffix(".parquet")
context.current.write_parquet(output)
return output
def build():
return (
Flow(group="Claims")
.watch(
mode="poll",
source="../../example_data/Input/claims_flat",
interval="5s",
extensions=[".xlsx", ".xlsm"],
)
.mirror(root="../../example_data/Output/example_mirror")
.step(read_claims, label="Read Excel")
.step(write_target, label="Write Parquet")
)
Why this pattern is useful:
poll reacts to new or changed source files
mirror.with_suffix(...)preserves source-relative output namingreturning the parquet path makes the output inspectable in the UI
Recipe: Filter rows and write a cleaned output
Starter flow: example_completed
import polars as pl
def read_claims(context):
return pl.read_excel(context.source.path)
def keep_completed(context):
return context.current.filter(pl.col("Step TO") == "COMPLETED")
def write_target(context):
output = context.mirror.with_suffix(".parquet")
context.current.write_parquet(output)
return output
def build():
return (
Flow(group="Claims")
.watch(
mode="poll",
source="../../example_data/Input/claims_flat",
interval="5s",
extensions=[".xlsx", ".xlsm"],
)
.mirror(root="../../example_data/Output/example_completed")
.step(read_claims, save_as="raw_df")
.step(keep_completed, use="raw_df", save_as="clean_df")
.step(write_target, use="clean_df")
)
This is the classic “read -> filter -> write” shape, and it is a good default when you want clear previewable intermediates.
Recipe: Capture source metadata during processing
Starter flow: example_metadata
def read_claims(context):
return pl.read_excel(context.source.path)
def capture_source_info(context):
metadata = context.source_metadata()
if metadata is not None:
context.metadata["source_name"] = metadata.name
context.metadata["source_size_bytes"] = metadata.size_bytes
return context.current
This is useful when you want provenance details recorded in context.metadata without changing the main pipeline object.
Recipe: Produce a stable latest snapshot
Starter flow: example_snapshot
def write_latest_snapshot(context):
snapshot = context.mirror.root_file("artifacts/example_snapshot.parquet")
context.current.write_parquet(snapshot)
return snapshot
Use mirror.root_file(...) when the result should be one stable artifact for the whole flow.
Recipe: Read selected worksheets from a multi-sheet workbook
Starter flow: example_multisheet
def read_selected_sheets(context):
return pl.read_excel(context.source.path, sheet_name=["Claims", "Summary"])
This is a good reminder that step code stays native and can call the underlying dataframe library directly.
Recipe: Single-file settings workflow
Starter flows: example_single_watch and example_schedule
def read_settings(context):
return pl.read_excel(context.source.path)
def write_settings(context):
output = context.mirror.with_suffix(".parquet")
context.current.write_parquet(output)
return output
def build():
return (
Flow(group="Settings")
.watch(
mode="schedule",
run_as="batch",
interval="15m",
source="../../example_data/Settings/single_watch.xlsx",
)
.mirror(root="../../example_data/Output/example_schedule")
.step(read_settings, save_as="settings_df")
.step(write_settings, use="settings_df", label="Write Parquet")
)
This is the right shape when the flow should rerun on a schedule against one well-known source file.
Recipe: Batch read with map(...) or step_each(...)
Starter flow shape: example_summary
from data_engine import Flow
import polars as pl
def read_claims(file_ref):
return pl.read_excel(file_ref.path)
def combine_claims(context):
return pl.concat(context.current, how="vertical_relaxed")
def build():
return (
Flow(group="Analytics")
.watch(mode="schedule", run_as="batch", interval="15m", source="../../example_data/Input/claims_flat")
.collect([".xlsx"], save_as="claim_files")
.map(read_claims, use="claim_files", save_as="claim_frames")
.step(combine_claims, use="claim_frames")
)
map(...) is the right tool when the same callable should run once per collected file, and step_each(...) is the equivalent alias. Both raise immediately when the batch is empty.
Recipe: Load into DuckDB and export a summary
Starter flow: example_summary
import duckdb
def read_claims(file_ref):
return pl.read_excel(file_ref.path)
def combine_claims(context):
return pl.concat(context.current, how="vertical_relaxed")
def build_summary(context):
conn = duckdb.connect(context.database("analytics.duckdb"))
try:
conn.register("input", context.current)
return conn.sql(
"""
select
workflow,
count(*) as row_count
from input
group by workflow
order by row_count desc
"""
).pl()
finally:
conn.close()
def write_summary(context):
output = context.mirror.file("workflow_summary.parquet")
context.current.write_parquet(output)
return output
def build():
return (
Flow(group="Analytics")
.watch(mode="schedule", run_as="batch", interval="15m", source="../../example_data/Input/claims_flat")
.mirror(root="../../example_data/Output/example_summary")
.collect([".xlsx"], save_as="claim_files")
.map(read_claims, use="claim_files", save_as="claim_frames")
.step(combine_claims, use="claim_frames", save_as="raw_df")
.step(build_summary, use="raw_df", save_as="summary_df")
.step(write_summary, use="summary_df")
)
That last example is a good place to prefer context.database(...), because the DuckDB file is acting like a workspace-local database asset with a stable workspace home.
Recipe: Use TOML workspace config
def apply_threshold(context):
cfg = context.config.require("claims")
threshold = cfg.get("filters", {}).get("minimum_amount", 0)
return context.current.filter(pl.col("amount") >= threshold)
This is a clean way to keep operator-tunable values out of the flow chain while still making the dependency explicit.
Recipe: Calculate business days and keep a grouped running total
from datetime import date
import data_engine.helpers
import polars as pl
df = (
df.sort(["claim_id", "sequence_number"])
.with_columns(
cumulative_business_days=
pl.when(pl.col("use_days"))
.then(
data_engine.helpers.networkdays(
"start_date",
"end_date",
holidays=[date(2026, 4, 15)],
)
)
.otherwise(pl.lit(0))
.cum_sum()
.over("claim_id")
)
)
This keeps the per-row business-day increment conditional, while the running total continues to accumulate within each group.
Recipe: Offset to the next business due date
from datetime import date
import data_engine.helpers
df = df.with_columns(
due_date=data_engine.helpers.workday(
"received_date",
"sla_days",
holidays=[date(2026, 4, 15)],
count_first_day=True,
)
)
Use count_first_day=True when the received day itself should count as day 1
for SLA-style deadlines.
Recipe: Write several outputs for one source
def write_outputs(context):
open_path = context.mirror.namespaced_file("open_claims.parquet")
closed_path = context.mirror.namespaced_file("closed_claims.parquet")
context.current.filter(pl.col("status") == "OPEN").write_parquet(open_path)
context.current.filter(pl.col("status") == "CLOSED").write_parquet(closed_path)
return open_path
Use namespaced_file(...) when one source item naturally produces several derived outputs.