Source code for data_engine.helpers.duckdb._dimensions

"""DuckDB helper functions for dimension-style workflows."""

from __future__ import annotations

from pathlib import Path

from data_engine.helpers.duckdb import duckdb
from data_engine.helpers.duckdb._common import FrameLike
from data_engine.helpers.duckdb._common import _index_name
from data_engine.helpers.duckdb._common import _join_predicate
from data_engine.helpers.duckdb._common import _materialize_frame
from data_engine.helpers.duckdb._common import _normalize_key_columns
from data_engine.helpers.duckdb._common import _normalize_selected_columns
from data_engine.helpers.duckdb._common import _ordered_columns
from data_engine.helpers.duckdb._common import _qualified_columns
from data_engine.helpers.duckdb._common import _quote_identifier
from data_engine.helpers.duckdb._common import _quote_table_ref
from data_engine.helpers.duckdb._common import _resolved_db_path
from data_engine.helpers.duckdb._common import _schema_ref
from data_engine.helpers.duckdb._common import _table_column_names
from data_engine.helpers.duckdb._read import read_rows_by_values


[docs] def build_dimension( db_path: str | Path, table: str, *, df: FrameLike, key_column: str = "dimension_key", return_df: bool = True, ): """Build or extend one dimension table from unique incoming row combinations.""" df = _materialize_frame(df) natural_columns = tuple(df.columns) if not natural_columns: raise ValueError("df must include at least one column.") normalized_key_column = str(key_column).strip() if not normalized_key_column: raise ValueError("key_column must be non-empty.") if normalized_key_column in natural_columns: raise ValueError(f'key_column {normalized_key_column!r} must not already exist in df columns.') quoted_table = _quote_table_ref(table) quoted_schema = _schema_ref(table) quoted_key_column = _quote_identifier(normalized_key_column) quoted_natural_columns = _ordered_columns(natural_columns) qualified_mapping_columns = _qualified_columns("mapping", natural_columns) natural_join = _join_predicate(left_alias="candidate", right_alias="existing", columns=natural_columns) mapping_join = _join_predicate(left_alias="mapping", right_alias="incoming_distinct", columns=natural_columns) order_by_columns = quoted_natural_columns temp_view = "__data_engine_dimension_incoming" temp_distinct = "__data_engine_dimension_incoming_distinct" temp_new_rows = "__data_engine_dimension_new_rows" unique_index_name = _quote_identifier(_index_name(table=table, columns=natural_columns)) resolved_db_path = _resolved_db_path(db_path) connection = duckdb.connect(resolved_db_path) try: connection.execute("BEGIN TRANSACTION") if quoted_schema is not None: connection.execute(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema}") connection.register(temp_view, df) connection.execute(f"CREATE OR REPLACE TEMP TABLE {temp_distinct} AS SELECT DISTINCT * FROM {temp_view}") connection.execute( f""" CREATE TABLE IF NOT EXISTS {quoted_table} AS SELECT CAST(NULL AS BIGINT) AS {quoted_key_column}, * FROM {temp_distinct} WHERE 1 = 0 """ ) connection.execute( f"CREATE UNIQUE INDEX IF NOT EXISTS {unique_index_name} ON {quoted_table} ({quoted_natural_columns})" ) connection.execute( f""" CREATE OR REPLACE TEMP TABLE {temp_new_rows} AS SELECT candidate.* FROM {temp_distinct} AS candidate LEFT JOIN {quoted_table} AS existing ON {natural_join} WHERE existing.{quoted_key_column} IS NULL """ ) connection.execute( f""" INSERT INTO {quoted_table} ({quoted_key_column}, {quoted_natural_columns}) SELECT current_keys.max_existing_key + ROW_NUMBER() OVER (ORDER BY {order_by_columns}) AS {quoted_key_column}, new_rows.* FROM {temp_new_rows} AS new_rows CROSS JOIN ( SELECT COALESCE(MAX({quoted_key_column}), 0) AS max_existing_key FROM {quoted_table} ) AS current_keys """ ) if not return_df: connection.execute("COMMIT") return None mapping = connection.execute( f""" SELECT {qualified_mapping_columns}, mapping.{quoted_key_column} FROM {quoted_table} AS mapping INNER JOIN {temp_distinct} AS incoming_distinct ON {mapping_join} ORDER BY {order_by_columns} """ ).pl() connection.execute("COMMIT") return mapping except Exception: try: connection.execute("ROLLBACK") except Exception: pass raise finally: connection.close()
[docs] def attach_dimension( db_path: str | Path, table: str, *, df: FrameLike, on: str | list[str] | tuple[str, ...], key_column: str = "dimension_key", drop_key: bool = False, ): """Attach an existing surrogate key mapping table to an input dataframe.""" df = _materialize_frame(df) join_columns = _normalize_key_columns(on) missing_columns = [column for column in join_columns if column not in df.columns] if missing_columns: raise ValueError(f"on columns must exist in df: {missing_columns!r}") mapping = read_rows_by_values( db_path, table, column=join_columns[0], is_in=df.get_column(join_columns[0]).unique().to_list(), select=[*join_columns, key_column], ).unique(subset=list(join_columns), maintain_order=True) normalized = df.join(mapping, on=list(join_columns), how="left", validate="m:1") if drop_key: normalized = normalized.drop(list(join_columns)) return normalized
[docs] def denormalize_columns( db_path: str | Path, table: str, *, df: FrameLike, key_column: str = "dimension_key", select: str | list[str] | tuple[str, ...] = "*", drop_key: bool = False, ): """Attach natural columns from an existing dimension table onto a keyed dataframe.""" df = _materialize_frame(df) normalized_key_column = str(key_column).strip() if not normalized_key_column: raise ValueError("key_column must be non-empty.") if normalized_key_column not in df.columns: raise ValueError(f"key_column {normalized_key_column!r} must exist in df.") resolved_db_path = _resolved_db_path(db_path) connection = duckdb.connect(resolved_db_path) try: table_columns = _table_column_names(connection, table) finally: connection.close() if not table_columns: raise ValueError(f"Table {table!r} does not exist or has no columns.") if normalized_key_column not in table_columns: raise ValueError(f"key_column {normalized_key_column!r} must exist in table {table!r}.") if select == "*": selected_columns = tuple(column for column in table_columns if column != normalized_key_column) else: selected_columns = _normalize_selected_columns(select) missing_columns = [column for column in selected_columns if column not in table_columns] if missing_columns: raise ValueError(f"select columns must exist in table {table!r}: {missing_columns!r}") if normalized_key_column in selected_columns: raise ValueError(f"select must not include key_column {normalized_key_column!r}.") if not selected_columns: raise ValueError("select must include at least one non-key column.") mapping = read_rows_by_values( db_path, table, column=normalized_key_column, is_in=df.get_column(normalized_key_column).unique().to_list(), select=[normalized_key_column, *selected_columns], ).unique(subset=[normalized_key_column], maintain_order=True) denormalized = df.join(mapping, on=[normalized_key_column], how="left", validate="m:1") if drop_key: denormalized = denormalized.drop([normalized_key_column]) return denormalized
[docs] def normalize_columns( db_path: str | Path, table: str, *, df: FrameLike, on: str | list[str] | tuple[str, ...], key_column: str = "dimension_key", drop_key: bool = True, returns: str | None = "df", ): """Build missing surrogate keys and attach them back onto the input dataframe.""" if returns not in {"df", "map", None}: raise ValueError('returns must be "df", "map", or None.') df = _materialize_frame(df) join_columns = _normalize_key_columns(on) natural_key_df = df.select(list(join_columns)).unique(maintain_order=True) mapping = build_dimension( db_path, table, df=natural_key_df, key_column=key_column, return_df=True, ) if mapping is None: raise RuntimeError("build_dimension() unexpectedly returned no mapping.") if returns == "map": return mapping if returns is None: return None return attach_dimension( db_path, table, df=df, on=join_columns, key_column=key_column, drop_key=drop_key, )