Database Methods

There is no first-class database sub-chain. Use DuckDB directly inside step callables, usually with a workspace-local database path from context.database(...).

If you want common warehouse-style shortcuts, see DuckDB Helpers. That helper layer covers several repeated patterns without taking over general SQL authoring.

The exact context.database(...) reference lives in the FlowContext.database docstring and is rendered in the API reference. Keep parameter details and copyable method examples beside the code in src/data_engine/core/primitives.py so editor help and packaged docs stay aligned.

That is intentional. The core API keeps connection ownership, transactions, and query semantics explicit in step code.

In practice, that means:

  • Data Engine gives you a conventional path

  • your step opens and closes the database connection

  • normal DuckDB and Python rules apply

context.database(...)

context.database(name) returns a path beneath the current authored workspace’s databases/ folder.

Examples:

context.database("analytics.duckdb")
context.database("claims/analytics.duckdb")

Those resolve to:

  • workspaces/<workspace_id>/databases/analytics.duckdb

  • workspaces/<workspace_id>/databases/claims/analytics.duckdb

Rules:

  • the path must be relative

  • parent directories are created automatically

  • the helper is only available for authored workspace flows

  • it returns the database path for your step to open

That last point is important. Returning the path keeps connection lifetime explicit and easy to reason about.

Example

import duckdb
import polars as pl

from data_engine import Flow


def read_claims(file_ref):
    return pl.read_excel(file_ref.path)


def build_source(context):
    return pl.concat(context.current, how="vertical_relaxed")


def summarize(context):
    conn = duckdb.connect(context.database("analytics.duckdb"))
    try:
        conn.register("input", context.current)
        return conn.sql(
            """
            select workflow, count(*) as row_count
            from input
            group by workflow
            """
        ).pl()
    finally:
        conn.close()


def build():
    return (
        Flow(group="Analytics")
        .watch(
            mode="schedule",
            run_as="batch",
            interval="15m",
            source="../../example_data/Input/claims_flat",
        )
        .mirror(root="../../example_data/Output/example_summary")
        .collect([".xlsx"], save_as="claim_files")
        .map(read_claims, use="claim_files", save_as="claim_frames")
        .step(build_source, use="claim_frames", save_as="raw_df")
        .step(summarize, use="raw_df", save_as="summary_df")
    )

This keeps the flow API small while still letting flow modules use native SQL and native DuckDB connections.

Good patterns

  • open the connection inside the step that needs it

  • close the connection in finally:

  • keep the path stable when you want incremental or append-oriented databases

  • use subfolders such as claims/analytics.duckdb when one workspace owns several related databases

A note on mirror vs database paths

If the database is a durable workspace-local asset, prefer context.database(...).

If the database is really just another output artifact produced by one mirrored source flow, context.mirror.root_file("analytics.duckdb") can still be appropriate.

The difference is mostly semantic:

  • context.database(...) says “this belongs to the workspace as a local database”

  • context.mirror... says “this belongs to this flow’s output namespace”