"""DuckDB helper functions that replace persisted table slices."""
from __future__ import annotations
from pathlib import Path
import polars as pl
from data_engine.helpers.duckdb import duckdb
from data_engine.helpers.duckdb._common import FrameLike
from data_engine.helpers.duckdb._common import _existing_table_columns
from data_engine.helpers.duckdb._common import _materialize_frame
from data_engine.helpers.duckdb._common import _ordered_columns
from data_engine.helpers.duckdb._common import _quote_identifier
from data_engine.helpers.duckdb._common import _quote_table_ref
from data_engine.helpers.duckdb._common import _resolved_db_path
from data_engine.helpers.duckdb._common import _schema_ref
[docs]
def replace_rows_by_file(
db_path: str | Path,
table: str,
*,
df: FrameLike,
file_hash: str,
file_hash_column: str = "file_key",
return_df: bool = True,
):
"""Atomically replace one file's fact rows and append the current batch."""
df = _materialize_frame(df)
normalized_file_hash = str(file_hash).strip()
if not normalized_file_hash:
raise ValueError("file_hash must be non-empty.")
normalized_file_hash_column = str(file_hash_column).strip()
if not normalized_file_hash_column:
raise ValueError("file_hash_column must be non-empty.")
if normalized_file_hash_column in df.columns:
raise ValueError(f'file_hash_column {normalized_file_hash_column!r} must not already exist in df columns.')
incoming_with_hash = df.with_columns(pl.lit(normalized_file_hash).alias(normalized_file_hash_column))
incoming_columns = tuple(incoming_with_hash.columns)
if not incoming_columns:
raise ValueError("df must include at least one column.")
quoted_table = _quote_table_ref(table)
quoted_schema = _schema_ref(table)
quoted_file_hash_column = _quote_identifier(normalized_file_hash_column)
quoted_incoming_columns = _ordered_columns(incoming_columns)
temp_view = "__data_engine_incremental_incoming"
temp_table = "__data_engine_incremental_incoming_table"
resolved_db_path = _resolved_db_path(db_path)
connection = duckdb.connect(resolved_db_path)
try:
connection.execute("BEGIN TRANSACTION")
if quoted_schema is not None:
connection.execute(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema}")
connection.register(temp_view, incoming_with_hash)
connection.execute(f"CREATE OR REPLACE TEMP TABLE {temp_table} AS SELECT * FROM {temp_view}")
connection.execute(f"CREATE TABLE IF NOT EXISTS {quoted_table} AS SELECT * FROM {temp_table} WHERE 1 = 0")
existing_columns = {name: dtype for _, name, dtype, *_ in _existing_table_columns(connection, table)}
incoming_info = connection.execute(f"PRAGMA table_info({temp_table})").fetchall()
for _, name, dtype, *_ in incoming_info:
if name in existing_columns:
continue
connection.execute(f"ALTER TABLE {quoted_table} ADD COLUMN {_quote_identifier(name)} {dtype}")
connection.execute(
f"""
DELETE FROM {quoted_table}
WHERE {quoted_file_hash_column} = ?
""",
[normalized_file_hash],
)
connection.execute(
f"""
INSERT INTO {quoted_table} ({quoted_incoming_columns})
SELECT {quoted_incoming_columns}
FROM {temp_table}
"""
)
if not return_df:
connection.execute("COMMIT")
return None
connection.execute("COMMIT")
return incoming_with_hash
except Exception:
try:
connection.execute("ROLLBACK")
except Exception:
pass
raise
finally:
connection.close()
[docs]
def replace_rows_by_values(
db_path: str | Path,
table: str,
*,
df: FrameLike,
column: str,
return_df: bool = True,
):
"""Atomically replace one value-slice of rows and append the current batch."""
df = _materialize_frame(df)
if df.is_empty():
raise ValueError("df must include at least one row.")
normalized_column = str(column).strip()
if not normalized_column:
raise ValueError("column must be non-empty.")
if normalized_column not in df.columns:
raise ValueError(f'column {normalized_column!r} must exist in df columns.')
lookup = df.select(pl.col(normalized_column)).unique(maintain_order=True)
if lookup.is_empty():
raise ValueError("df must include at least one replacement value.")
quoted_table = _quote_table_ref(table)
quoted_schema = _schema_ref(table)
quoted_column = _quote_identifier(normalized_column)
quoted_df_columns = _ordered_columns(tuple(df.columns))
temp_view = "__data_engine_replace_values_df"
temp_table = "__data_engine_replace_values_df_table"
temp_lookup_view = "__data_engine_replace_values_lookup"
temp_lookup_table = "__data_engine_replace_values_lookup_table"
resolved_db_path = _resolved_db_path(db_path)
connection = duckdb.connect(resolved_db_path)
try:
connection.execute("BEGIN TRANSACTION")
if quoted_schema is not None:
connection.execute(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema}")
connection.register(temp_view, df)
connection.execute(f"CREATE OR REPLACE TEMP TABLE {temp_table} AS SELECT * FROM {temp_view}")
connection.execute(f"CREATE TABLE IF NOT EXISTS {quoted_table} AS SELECT * FROM {temp_table} WHERE 1 = 0")
existing_columns = {name: dtype for _, name, dtype, *_ in _existing_table_columns(connection, table)}
incoming_info = connection.execute(f"PRAGMA table_info({temp_table})").fetchall()
for _, name, dtype, *_ in incoming_info:
if name in existing_columns:
continue
connection.execute(f"ALTER TABLE {quoted_table} ADD COLUMN {_quote_identifier(name)} {dtype}")
existing_columns[name] = dtype
target_column_type = existing_columns[normalized_column]
connection.register(temp_lookup_view, lookup)
connection.execute(
f"""
CREATE OR REPLACE TEMP TABLE {temp_lookup_table} AS
SELECT CAST({_quote_identifier(normalized_column)} AS {target_column_type}) AS lookup_value
FROM {temp_lookup_view}
"""
)
connection.execute(
f"""
DELETE FROM {quoted_table}
WHERE EXISTS (
SELECT 1
FROM {temp_lookup_table} AS lookup
WHERE {quoted_table}.{quoted_column} IS NOT DISTINCT FROM lookup.lookup_value
)
"""
)
connection.execute(
f"""
INSERT INTO {quoted_table} ({quoted_df_columns})
SELECT {quoted_df_columns}
FROM {temp_table}
"""
)
if not return_df:
connection.execute("COMMIT")
return None
connection.execute("COMMIT")
return df
except Exception:
try:
connection.execute("ROLLBACK")
except Exception:
pass
raise
finally:
connection.close()
[docs]
def replace_table(
db_path: str | Path,
table: str,
*,
df: FrameLike,
return_df: bool = True,
):
"""Replace one DuckDB table wholesale from the provided dataframe."""
df = _materialize_frame(df)
df_columns = tuple(df.columns)
if not df_columns:
raise ValueError("df must include at least one column.")
quoted_table = _quote_table_ref(table)
quoted_schema = _schema_ref(table)
quoted_df_columns = _ordered_columns(df_columns)
temp_view = "__data_engine_replace_table_df"
temp_table = "__data_engine_replace_table_df_table"
resolved_db_path = _resolved_db_path(db_path)
connection = duckdb.connect(resolved_db_path)
try:
connection.execute("BEGIN TRANSACTION")
if quoted_schema is not None:
connection.execute(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema}")
connection.register(temp_view, df)
connection.execute(f"CREATE OR REPLACE TEMP TABLE {temp_table} AS SELECT * FROM {temp_view}")
connection.execute(f"CREATE TABLE IF NOT EXISTS {quoted_table} AS SELECT * FROM {temp_table} WHERE 1 = 0")
existing_columns = {name: dtype for _, name, dtype, *_ in _existing_table_columns(connection, table)}
incoming_info = connection.execute(f"PRAGMA table_info({temp_table})").fetchall()
for _, name, dtype, *_ in incoming_info:
if name in existing_columns:
continue
connection.execute(f"ALTER TABLE {quoted_table} ADD COLUMN {_quote_identifier(name)} {dtype}")
connection.execute(f"DELETE FROM {quoted_table}")
connection.execute(
f"""
INSERT INTO {quoted_table} ({quoted_df_columns})
SELECT {quoted_df_columns}
FROM {temp_table}
"""
)
if not return_df:
connection.execute("COMMIT")
return None
connection.execute("COMMIT")
return df
except Exception:
try:
connection.execute("ROLLBACK")
except Exception:
pass
raise
finally:
connection.close()