DuckDB Helpers
data_engine.helpers.duckdb is the first public helper layer for common warehouse-style authoring patterns.
The function-by-function reference lives in the helper docstrings and is
rendered in the API reference. Keep signature details and copyable examples
beside the functions in src/data_engine/helpers/duckdb.py; that keeps editor
hover help and the packaged docs aligned. This page explains the shared design
shape and when the helper family is a good fit.
These helpers are intentionally:
one-shot
explicit about the database path
explicit about the target table
responsible for their own connection lifecycle
That means each helper:
opens DuckDB
does one job
commits or rolls back
closes the connection
They are designed for flow code that wants less repeated SQL plumbing without hiding too much behavior.
Import style
from data_engine.helpers.duckdb import build_dimension
from data_engine.helpers.duckdb import attach_dimension
from data_engine.helpers.duckdb import compact_database
from data_engine.helpers.duckdb import denormalize_columns
from data_engine.helpers.duckdb import normalize_columns
from data_engine.helpers.duckdb import read_rows_by_values
from data_engine.helpers.duckdb import read_sql
from data_engine.helpers.duckdb import read_table
from data_engine.helpers.duckdb import replace_rows_by_file
from data_engine.helpers.duckdb import replace_rows_by_values
from data_engine.helpers.duckdb import replace_table
The expected path pattern is:
db_path = context.database("claims/analytics.duckdb")
You can also use a mirrored output path or any other DuckDB file path you control. The helpers work with any DuckDB path you provide.
build_dimension(...)
Use this helper when you already have a dataframe trimmed down to only the natural-key columns and want to persist or extend a surrogate-key table.
Signature:
build_dimension(
db_path,
table,
*,
df,
key_column="dimension_key",
return_df=True,
)
Behavior:
treats every column in
dfas part of the natural keycreates the table if it does not exist
inserts only missing unique combinations
assigns deterministic integer surrogate keys
returns the natural-key-to-surrogate-key mapping when
return_df=True
Example:
mapping = build_dimension(
context.database("warehouse.duckdb"),
"mart.dim_member",
df=member_keys_df,
key_column="member_key",
)
Returned mapping:
member_id | lob | member_key
attach_dimension(...)
Use this helper when the surrogate-key table already exists and you only want to join the key back onto a dataframe.
Signature:
attach_dimension(
db_path,
table,
*,
df,
on,
key_column="dimension_key",
drop_key=False,
)
Key arguments:
oncan be one column name or a list of column namesdrop_key=Falsekeeps the natural-key columns by defaultset
drop_key=Truewhen you want the attached surrogate key without the original key columns
Example:
attached = attach_dimension(
context.database("warehouse.duckdb"),
"mart.dim_member",
df=claims_df,
on=["member_id", "lob"],
key_column="member_key",
)
normalize_columns(...)
Use this helper when you want to build missing surrogate keys and immediately attach them back onto the full dataframe.
Signature:
normalize_columns(
db_path,
table,
*,
df,
on,
key_column="dimension_key",
drop_key=True,
returns="df",
)
Key arguments:
oncan be one column name or a list of column namesdrop_key=Trueremoves the natural-key columns after the surrogate key is joined backreturns="df"returns the normalized dataframereturns="map"returns only the persisted mappingreturns=Noneperforms side effects only
Example:
normalized = normalize_columns(
context.database("warehouse.duckdb"),
"mart.dim_member",
df=claims_df,
on=["member_id", "lob"],
key_column="member_key",
)
If claims_df starts with:
member_id | lob | amount
Then normalized becomes:
amount | member_key
This helper uses build_dimension(...) and attach_dimension(...) internally.
denormalize_columns(...)
Use this helper when your dataframe already has a surrogate key and you want to attach the natural columns back from the persisted dimension table.
Signature:
denormalize_columns(
db_path,
table,
*,
df,
key_column="dimension_key",
select="*",
drop_key=False,
)
Key arguments:
key_columnis the surrogate key used to join fromdfinto the dimension tableselect="*"attaches every non-key column from the dimension tableselect=[...]lets you attach only a subset of natural columnsdrop_key=Falsekeeps the surrogate key by default
Example:
denormalized = denormalize_columns(
context.database("warehouse.duckdb"),
"mart.dim_member",
df=fact_df,
key_column="member_key",
)
replace_rows_by_file(...)
Use this helper when one incoming dataframe represents the full current contents for one source file.
Signature:
replace_rows_by_file(
db_path,
table,
*,
df,
file_hash,
file_hash_column="file_key",
return_df=True,
)
Behavior:
adds a constant file-hash column to
dfcreates the table if it does not exist
expands the table schema when new columns appear
deletes existing rows for that file hash
appends the current batch
Example:
updated = replace_rows_by_file(
context.database("warehouse.duckdb"),
"canon.claim_rows",
df=claims_df,
file_hash=context.metadata["file_hash"],
)
This is the usual pattern for canon-style “replace one file slice” loading.
replace_rows_by_values(...)
Use this helper when one incoming dataframe represents the full current contents for one logical value slice.
Signature:
replace_rows_by_values(
db_path,
table,
*,
df,
column,
return_df=True,
)
Behavior:
takes the distinct values from
df[column]deletes existing rows in the target table where
columnmatches any of those valuesappends the current batch
creates and expands the table as needed
Example:
updated = replace_rows_by_values(
context.database("warehouse.duckdb"),
"mart.fact_claim",
df=claims_for_open_status,
column="status",
)
That says: “replace every persisted status slice represented by this batch, then insert this batch.”
compact_database(...)
Use this helper for explicit maintenance flows when you want to clean one DuckDB file after a period of ingestion.
Signature:
compact_database(
db_path,
*,
tables=None,
drop_all_null_columns=True,
vacuum=True,
)
Behavior:
inspects one or more user tables in the database
drops columns whose persisted values are entirely null
preserves at least one column per table
optionally runs
VACUUMafter schema cleanupreturns a Polars summary dataframe with dropped-column and file-size metadata
Example:
summary = compact_database(
context.database("warehouse.duckdb"),
tables=["mart.fact_claim", "mart.fact_member"],
vacuum=True,
)
This is a good fit for manual maintenance flows where you want a simple one-liner per database.
read_rows_by_values(...)
Use this helper when you want a small filtered lookup out of DuckDB as a Polars dataframe.
Signature:
read_rows_by_values(
db_path,
table,
*,
column,
is_in,
select,
)
Behavior:
returns rows where
columnmatches one of the provided valuesreturns only the selected columns
uses a temporary lookup table internally, which works better than manually assembling long SQL
IN (...)strings
Example:
existing = read_rows_by_values(
context.database("warehouse.duckdb"),
"mart.fact_claim",
column="claim_id",
is_in=[1001, 1002, 1003],
select=["claim_id", "member_key", "amount"],
)
read_sql(...)
Use this helper when you already have the exact DuckDB query you want and just need the result as a Polars dataframe.
Signature:
read_sql(
db_path,
*,
sql,
)
Example:
result = read_sql(
context.database("warehouse.duckdb"),
sql="""
SELECT claim_id, amount
FROM mart.fact_claim
WHERE amount >= 100
""",
)
This is the most direct read helper. If you already know the SQL you want, use this.
read_table(...)
Use this helper when you want a lightweight table reader without writing the whole SQL statement.
Signature:
read_table(
db_path,
table,
*,
select="*",
where=None,
limit=None,
)
Example:
result = read_table(
context.database("warehouse.duckdb"),
"mart.fact_claim",
select=["claim_id", "amount"],
where='"amount" >= 100',
limit=100,
)
This helper is intentionally small:
selectcan be"*"or a list of column nameswhereis passed through as SQLlimitis optional
replace_table(...)
Use this helper when you want to replace the entire contents of one table with the current dataframe.
Signature:
replace_table(
db_path,
table,
*,
df,
return_df=True,
)
Behavior:
creates the table if it does not exist
expands the table schema when new columns appear
deletes all existing rows
inserts the current dataframe
Example:
replace_table(
context.database("warehouse.duckdb"),
"mart.current_snapshot",
df=snapshot_df,
)
This is the simplest full-refresh write helper in the current set.
Design guidance
These helpers are best when:
the database path is stable
table ownership is clear
the dataframe shape is already mostly what you want
you want predictable transactional behavior
These helpers support common repeated patterns in flow code. Steps that need custom joins, custom window logic, or highly specific query behavior can use plain DuckDB directly.
When to use direct DuckDB instead
Prefer direct DuckDB code when:
the operation is highly custom
you want several SQL statements in one step
you want full manual control over relation registration, temp tables, or query flow
The helpers remove repeated boilerplate and keep common warehouse-style operations concise.