Core Concepts

Flow

A Flow is an immutable definition with:

  • group

  • an optional trigger via watch(...)

  • an optional mirrored output binding via mirror(...)

  • ordered generic steps

from data_engine import Flow

flow = Flow(group="Docs")

The flow-module filename is the flow identity used for discovery and runtime bookkeeping. group is the author-controlled grouping visible in the UI.

Runtime modes

Manual:

  • no trigger configured

  • run_once() executes the steps once with context.current = None

  • useful for button-driven operator runs or preview-oriented flows

Poll:

  • source-driven execution over either one file or a directory of files

  • the runtime compares the current source file signature against the persisted runtime ledger

  • the first step sees the active input through context.source

  • startup backlog handling is based on persisted ledger state for each source version

  • intermediate saved objects do not participate in staleness checks

Schedule:

  • interval-driven via watch(mode="schedule", interval="15m")

  • or wall-clock via watch(mode="schedule", time="10:31")

  • time may also be a collection such as ["08:15", "14:45"]

  • may optionally bind a source=... path for recurring jobs

The distinction between poll and schedule is important:

  • poll is source freshness driven

  • schedule is time driven

You can combine scheduled execution with a source binding when the flow should run on a schedule but still read from a known source root.

Step

Each step(...) is one callable:

def step(context) -> object:
    ...

The return value always becomes context.current.

use= on a step loads a saved object from context.objects into context.current before the callable runs. save_as= stores the returned value back into context.objects for later steps, previews, or notebook inspection. The runtime owns current, so save_as="current" is rejected.

This is the main design boundary:

  • the fluent API orchestrates runtime behavior

  • native libraries perform the actual data and file work

That means Data Engine coordinates Polars, DuckDB, pathlib, and your Python helper code through one runtime model.

Saved objects

Steps can save and reuse values:

(
    Flow(group="Docs")
    .step(read_docs, save_as="raw_df")
    .step(clean_docs, use="raw_df", save_as="clean_df")
    .step(write_output, use="clean_df")
)
  • use="name" loads context.objects["name"] into context.current

  • save_as="name" stores the returned value into context.objects["name"]

  • use="current" leaves the current runtime value in place

  • save_as="current" is invalid

In an external notebook or REPL, those saved names are also the easiest way to inspect intermediates:

build().preview(use="clean_df").head(10)

This is one of the most useful parts of the authoring model:

  • current gives you the current object in the pipeline

  • objects gives you stable named waypoints

That makes it easy to structure flows around a few explicit intermediate states and readable named waypoints.

Batch mapping

collect(...) and map(...) are the batch-oriented authoring tools.

def read_docs(file_ref):
    return pl.read_excel(file_ref.path)


def combine_docs(context):
    return pl.concat(context.current, how="vertical_relaxed")


flow = (
    Flow(group="Analytics")
    .watch(mode="schedule", run_as="batch", interval="15m", source="../../example_data/Input/docs_flat")
    .collect([".xlsx"], save_as="doc_files")
    .map(read_docs, use="doc_files", save_as="doc_frames")
    .step(combine_docs, use="doc_frames")
)

collect(...) gathers matching files into a Batch of FileRef items. map(...) runs the same callable once per batch item, and step_each(...) is the equivalent alias. map(...) accepts either item or context, item and raises immediately when the batch is empty or not iterable.

Batch mapping is especially useful when you want to:

  • read many files into many dataframes

  • validate one file at a time

  • emit one lightweight record per source item before combining

Use a normal step(...) when the callable should reason about the batch as a whole.

Source and mirror namespaces

The runtime exposes two structured path namespaces:

  • context.source

  • context.mirror

Examples:

context.source.path
context.source.with_extension(".json")
context.source.with_suffix(".json")
context.source.file("notes.json")
context.source.namespaced_file("notes.json")
context.source.root_file("lookup.csv")

context.mirror.with_extension(".parquet")
context.mirror.with_suffix(".parquet")
context.mirror.file("open_docs.parquet")
context.mirror.namespaced_file("open_docs.parquet")
context.mirror.root_file("analytics.duckdb")

context.source resolves read-side paths. context.mirror resolves write-ready output paths.

The important difference is:

  • source is about where the active input lives

  • mirror is about where outputs for that input should go

That lets you keep path logic readable and source-aware without hand-building relative paths in every step.

Examples of common patterns:

  • read a sidecar file beside the current source with context.source.file("notes.json")

  • write one mirrored parquet beside the source shape with context.mirror.with_suffix(".parquet")

  • write multiple outputs for the same source with context.mirror.namespaced_file(...)

  • write a stable root-level artifact such as a snapshot or DuckDB file with context.mirror.root_file(...)

Discovery

The desktop UI and Python entrypoints discover flows from compiled .py and .ipynb flow modules.

Each discovered flow module contributes:

  • a module name

  • optional DESCRIPTION

  • build() -> Flow

The flow-module filename/module name is the flow identity surfaced in discovery and execution. The UI uses Flow.label when present, otherwise it derives a readable title from that internal name.

That discovered Flow object is what the UI inspects for:

  • grouping

  • step labels

  • runtime mode

  • source and mirror bindings

The authored Flow is the contract the runtime and UI inspect after discovery.

Workspaces

Flows are discovered from the currently selected authored workspace.

An authored workspace typically contains:

  • flow_modules/

  • flow_modules/flow_helpers/

  • config/

  • databases/

Notebook-authored flows are compiled into the same discovery surface as Python-authored flows, so both authoring styles participate in the same workspace layout and runtime rules.

The desktop app binds to one workspace at a time. When the selected workspace changes, the app reloads:

  • discovered flows

  • local runtime state

  • daemon control state

  • visible runs and logs

For the control and state model behind that, see App Runtime and Workspaces.