Source code for data_engine.helpers.polars

"""Polars namespace helpers for Data Engine flow authoring."""

from __future__ import annotations

from collections.abc import Callable, Iterable
from datetime import date, datetime, timedelta
import os
from pathlib import Path
import time
from uuid import uuid4

import numpy as np
import polars as pl

from data_engine.helpers.duckdb import attach_dimension as _attach_dimension
from data_engine.helpers.duckdb import build_dimension as _build_dimension
from data_engine.helpers.duckdb import denormalize_columns as _denormalize_columns
from data_engine.helpers.duckdb import normalize_columns as _normalize_columns
from data_engine.helpers.duckdb import replace_rows_by_file as _replace_rows_by_file
from data_engine.helpers.duckdb import replace_rows_by_values as _replace_rows_by_values
from data_engine.helpers.duckdb import replace_table as _replace_table
from data_engine.helpers.schema import normalize_column_names as _normalize_column_names

PathLike = str | os.PathLike[str]
ColumnNames = str | list[str] | tuple[str, ...]
ReturnMode = str | None
WeekMask = tuple[bool, bool, bool, bool, bool, bool, bool]
DateLike = date | datetime
ExprLike = pl.Expr | str | DateLike
IntExprLike = pl.Expr | str | int
_DEFAULT_WEEK_MASK: WeekMask = (True, True, True, True, True, False, False)


[docs] def networkdays( start: ExprLike, end: ExprLike, *, holidays: list[DateLike | str] | tuple[DateLike | str, ...] | set[DateLike | str] | None = None, count_first_day: bool = False, mask: Iterable[bool] | None = None, ) -> pl.Expr: """Return Excel-style business-day counts as a Polars expression. This helper matches Excel ``NETWORKDAYS`` semantics by counting both endpoints when they are business days. Weekends default to Saturday/Sunday, and optional holidays are excluded from the count. The one intentional extension is ``count_first_day``. When enabled, the start date is still counted even if it falls on a masked weekday or one of the supplied holidays. Parameters ---------- start : pl.Expr | str | date | datetime Start date expression, column name, or scalar date/datetime. end : pl.Expr | str | date | datetime End date expression, column name, or scalar date/datetime. holidays : list[date | datetime | str] | tuple[...] | set[...] | None Optional holiday dates removed from the business-day count. String values must use ISO date text such as ``"2026-04-15"``. count_first_day : bool Whether to force the first day into the count when it would normally be excluded by the weekday mask or holiday list. mask : Iterable[bool] | None Monday-first seven-item business-day mask. Every item must be a real ``bool``. ``None`` uses the Excel default: Monday-Friday counted, Saturday-Sunday excluded. Returns ------- pl.Expr Expression that evaluates to the signed business-day count. Datetime inputs are normalized to their calendar date before counting. Examples -------- Add a row-level business-day count: .. code-block:: python from datetime import date import polars as pl import data_engine.helpers df = pl.DataFrame( { "received_date": [date(2026, 4, 13), date(2026, 4, 14)], "due_date": [date(2026, 4, 17), date(2026, 4, 21)], } ).with_columns( business_days=data_engine.helpers.networkdays( "received_date", "due_date", holidays=[date(2026, 4, 15)], ) ) Use scalar datetimes and count the first day: .. code-block:: python from datetime import datetime df = df.with_columns( sla_days=data_engine.helpers.networkdays( datetime(2026, 4, 13, 8, 30), pl.col("resolved_at"), count_first_day=True, ) ) Chain the expression into a grouped cumulative total: .. code-block:: python df = ( df.sort(["claim_id", "sequence_number"]) .with_columns( cumulative_business_days= pl.when(pl.col("use_days")) .then( data_engine.helpers.networkdays( "start_date", "end_date", holidays=[date(2026, 4, 15)], ) ) .otherwise(pl.lit(0)) .cum_sum() .over("claim_id") ) ) Notes ----- ``networkdays(...)`` returns a normal ``pl.Expr``. You can chain it into ``cum_sum()``, window expressions, filters, and any other Polars expression pipeline. """ week_mask = _coerce_week_mask(mask) holiday_dates = _coerce_holiday_dates(holidays) start_expr = _as_date_expr(start) end_expr = _as_date_expr(end) return pl.struct( start=start_expr, end=end_expr, ).map_batches( lambda batch: _networkdays_batch( batch.struct.field("start"), batch.struct.field("end"), week_mask=week_mask, holiday_dates=holiday_dates, count_first_day=count_first_day, ), return_dtype=pl.Int64, )
[docs] def workday( start: ExprLike, days: IntExprLike, *, holidays: list[DateLike | str] | tuple[DateLike | str, ...] | set[DateLike | str] | None = None, count_first_day: bool = False, mask: Iterable[bool] | None = None, ) -> pl.Expr: """Return Excel-style workday offsets as a Polars expression. This helper mirrors Excel ``WORKDAY`` by returning the business date that falls the requested number of working days before or after ``start``. The one intentional extension is ``count_first_day``. When enabled, the start date itself can be day 1, even if it falls on a masked weekday or one of the supplied holidays. Parameters ---------- start : pl.Expr | str | date | datetime Start date expression, column name, or scalar date/datetime. days : pl.Expr | str | int Signed business-day offset expression, column name, or scalar integer. holidays : list[date | datetime | str] | tuple[...] | set[...] | None Optional holiday dates skipped while calculating the result. String values must use ISO date text such as ``"2026-04-15"``. count_first_day : bool Whether the start date itself can count as day 1 when moving forward or backward through business days. mask : Iterable[bool] | None Monday-first seven-item business-day mask. Every item must be a real ``bool``. ``None`` uses the Excel default: Monday-Friday counted, Saturday-Sunday excluded. Returns ------- pl.Expr Expression that evaluates to a ``Date`` result. Datetime inputs are normalized to their calendar date before offsetting. Examples -------- Add one target business date column: .. code-block:: python from datetime import date import polars as pl import data_engine.helpers df = pl.DataFrame( { "received_date": [date(2026, 4, 13), date(2026, 4, 14)], "sla_days": [3, 5], } ).with_columns( due_date=data_engine.helpers.workday( "received_date", "sla_days", holidays=[date(2026, 4, 15)], ) ) Count the start date as day 1: .. code-block:: python df = df.with_columns( due_date=data_engine.helpers.workday( "received_date", "sla_days", holidays=[date(2026, 4, 15)], count_first_day=True, ) ) Use a custom weekday mask where Saturday is also a business day: .. code-block:: python df = df.with_columns( due_date=data_engine.helpers.workday( "received_date", "sla_days", mask=(True, True, True, True, True, True, False), ) ) """ week_mask = _coerce_week_mask(mask) holiday_dates = _coerce_holiday_dates(holidays) start_expr = _as_date_expr(start) days_expr = _as_int_expr(days).cast(pl.Int64) return pl.struct( start=start_expr, days=days_expr, ).map_batches( lambda batch: _workday_batch( batch.struct.field("start"), batch.struct.field("days"), week_mask=week_mask, holiday_dates=holiday_dates, count_first_day=count_first_day, ), return_dtype=pl.Date, )
def _coerce_week_mask(mask: Iterable[bool] | None) -> WeekMask: if mask is None: return _DEFAULT_WEEK_MASK values = tuple(mask) if len(values) != 7: raise ValueError("mask must contain exactly seven Monday-first boolean values.") if not all(isinstance(value, bool) for value in values): raise TypeError("mask must contain exactly seven Monday-first boolean values.") return values # type: ignore[return-value] def _coerce_holiday_dates( holidays: list[DateLike | str] | tuple[DateLike | str, ...] | set[DateLike | str] | None, ) -> tuple[date, ...]: if holidays is None: return () values: set[date] = set() for value in holidays: if isinstance(value, datetime): values.add(value.date()) continue if isinstance(value, date): values.add(value) continue if isinstance(value, str): values.add(date.fromisoformat(value)) continue raise TypeError("holidays must contain date, datetime, or ISO date string values.") return tuple(sorted(values)) def _as_date_expr(value: ExprLike) -> pl.Expr: if isinstance(value, pl.Expr): return value.cast(pl.Date) if isinstance(value, str): return pl.col(value).cast(pl.Date) return pl.lit(value).cast(pl.Date) def _as_int_expr(value: IntExprLike) -> pl.Expr: if isinstance(value, pl.Expr): return value if isinstance(value, str): return pl.col(value) return pl.lit(value) def _is_business_day_expr( date_expr: pl.Expr, week_mask: WeekMask, holiday_dates: tuple[date, ...], ) -> pl.Expr: weekday = date_expr.dt.weekday() day_allowed = pl.lit(False) for index, allowed in enumerate(week_mask, start=1): day_allowed = pl.when(weekday == index).then(pl.lit(allowed)).otherwise(day_allowed) holiday_expr = ( date_expr.is_in(pl.lit(list(holiday_dates), dtype=pl.List(pl.Date))) if holiday_dates else pl.lit(False) ) return day_allowed & ~holiday_expr def _forced_first_day_adjustment( start_expr: pl.Expr, end_expr: pl.Expr, week_mask: WeekMask, holiday_dates: tuple[date, ...], ) -> pl.Expr: start_already_counted = _is_business_day_expr(start_expr, week_mask, holiday_dates) return ( pl.when(~start_already_counted) .then(pl.when(start_expr <= end_expr).then(pl.lit(1)).otherwise(pl.lit(-1))) .otherwise(pl.lit(0)) ) def _networkdays_scalar( start_date: date | None, end_date: date | None, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], count_first_day: bool, ) -> int | None: if start_date is None or end_date is None: return None if start_date <= end_date: result = _forward_networkdays(start_date, end_date, week_mask=week_mask, holiday_dates=holiday_dates) else: result = -_forward_networkdays(end_date, start_date, week_mask=week_mask, holiday_dates=holiday_dates) if count_first_day and not _is_business_day_scalar(start_date, week_mask=week_mask, holiday_dates=holiday_dates): return result + (1 if start_date <= end_date else -1) return result def _forward_networkdays( start_date: date, end_date: date, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], ) -> int: delta_days = (end_date - start_date).days + 1 full_weeks, extra_days = divmod(delta_days, 7) business_days = (full_weeks + 1) * sum(week_mask) for offset in range(1, 8 - extra_days): trailing_day = end_date + timedelta(days=offset) if week_mask[trailing_day.weekday()]: business_days -= 1 for holiday in holiday_dates: if start_date <= holiday <= end_date and week_mask[holiday.weekday()]: business_days -= 1 return business_days def _is_business_day_scalar( value: date, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], ) -> bool: return week_mask[value.weekday()] and value not in holiday_dates def _numpy_weekmask_text(week_mask: WeekMask) -> str: return "".join("1" if allowed else "0" for allowed in week_mask) def _numpy_holiday_array(holiday_dates: tuple[date, ...]) -> np.ndarray: if not holiday_dates: return np.array([], dtype="datetime64[D]") return np.array(holiday_dates, dtype="datetime64[D]") def _numpy_busdaycalendar(week_mask: WeekMask, holiday_dates: tuple[date, ...]) -> np.busdaycalendar: return np.busdaycalendar( weekmask=_numpy_weekmask_text(week_mask), holidays=_numpy_holiday_array(holiday_dates), ) def _networkdays_batch( start_series: pl.Series, end_series: pl.Series, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], count_first_day: bool, ) -> pl.Series: calendar = _numpy_busdaycalendar(week_mask, holiday_dates) starts = start_series.to_numpy() ends = end_series.to_numpy() result = np.full(len(start_series), None, dtype=object) valid = ~(np.isnat(starts) | np.isnat(ends)) if valid.any(): valid_starts = starts[valid] valid_ends = ends[valid] forward = valid_starts <= valid_ends counts = np.empty(valid_starts.shape[0], dtype=np.int64) if forward.any(): counts[forward] = np.busday_count( valid_starts[forward], valid_ends[forward] + np.timedelta64(1, "D"), busdaycal=calendar, ) if (~forward).any(): counts[~forward] = -np.busday_count( valid_ends[~forward], valid_starts[~forward] + np.timedelta64(1, "D"), busdaycal=calendar, ) if count_first_day: start_business = np.is_busday(valid_starts, busdaycal=calendar) counts = counts + np.where(~start_business, np.where(forward, 1, -1), 0) result[valid] = counts.tolist() return pl.Series(result.tolist(), dtype=pl.Int64) def _busday_offset_array( dates: np.ndarray, offsets: np.ndarray, *, calendar: np.busdaycalendar, roll: str, ) -> np.ndarray: if dates.size == 0: return np.array([], dtype="datetime64[D]") return np.busday_offset(dates, offsets, roll=roll, busdaycal=calendar) def _workday_batch( start_series: pl.Series, days_series: pl.Series, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], count_first_day: bool, ) -> pl.Series: calendar = _numpy_busdaycalendar(week_mask, holiday_dates) starts = start_series.to_numpy() days_values = days_series.to_numpy() result = np.full(len(start_series), np.datetime64("NaT", "D"), dtype="datetime64[D]") valid = ~np.isnat(starts) & ~np.isnan(days_values) if valid.any(): valid_starts = starts[valid] valid_days = days_values[valid].astype(np.int64, copy=False) valid_result = np.full(valid_starts.shape[0], np.datetime64("NaT", "D"), dtype="datetime64[D]") is_business = np.is_busday(valid_starts, busdaycal=calendar) next_business = _busday_offset_array(valid_starts, np.zeros(valid_starts.shape[0], dtype=np.int64), calendar=calendar, roll="forward") prev_business = _busday_offset_array(valid_starts, np.zeros(valid_starts.shape[0], dtype=np.int64), calendar=calendar, roll="backward") zero_mask = valid_days == 0 pos_mask = valid_days > 0 neg_mask = valid_days < 0 if count_first_day: valid_result[zero_mask] = valid_starts[zero_mask] mask = is_business & pos_mask if mask.any(): valid_result[mask] = _busday_offset_array( valid_starts[mask], valid_days[mask] - 1, calendar=calendar, roll="forward", ) mask = is_business & neg_mask if mask.any(): valid_result[mask] = _busday_offset_array( valid_starts[mask], valid_days[mask] + 1, calendar=calendar, roll="backward", ) mask = (~is_business) & (valid_days == 1) valid_result[mask] = valid_starts[mask] mask = (~is_business) & (valid_days > 1) if mask.any(): valid_result[mask] = _busday_offset_array( next_business[mask], valid_days[mask] - 2, calendar=calendar, roll="forward", ) mask = (~is_business) & (valid_days == -1) valid_result[mask] = valid_starts[mask] mask = (~is_business) & (valid_days < -1) if mask.any(): valid_result[mask] = _busday_offset_array( prev_business[mask], valid_days[mask] + 2, calendar=calendar, roll="backward", ) else: mask = is_business & zero_mask valid_result[mask] = valid_starts[mask] mask = is_business & pos_mask if mask.any(): valid_result[mask] = _busday_offset_array( valid_starts[mask], valid_days[mask], calendar=calendar, roll="forward", ) mask = is_business & neg_mask if mask.any(): valid_result[mask] = _busday_offset_array( valid_starts[mask], valid_days[mask], calendar=calendar, roll="backward", ) mask = (~is_business) & zero_mask valid_result[mask] = next_business[mask] mask = (~is_business) & pos_mask if mask.any(): valid_result[mask] = _busday_offset_array( next_business[mask], valid_days[mask] - 1, calendar=calendar, roll="forward", ) mask = (~is_business) & neg_mask if mask.any(): valid_result[mask] = _busday_offset_array( prev_business[mask], valid_days[mask] + 1, calendar=calendar, roll="backward", ) result[valid] = valid_result return pl.Series(result.tolist(), dtype=pl.Date) def _next_business_day( value: date, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], ) -> date: current = value while not _is_business_day_scalar(current, week_mask=week_mask, holiday_dates=holiday_dates): current += timedelta(days=1) return current def _previous_business_day( value: date, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], ) -> date: current = value while not _is_business_day_scalar(current, week_mask=week_mask, holiday_dates=holiday_dates): current -= timedelta(days=1) return current def _advance_business_days( start_date: date, days: int, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], ) -> date: current = start_date remaining = days step = 1 if remaining >= 0 else -1 while remaining != 0: current += timedelta(days=step) if _is_business_day_scalar(current, week_mask=week_mask, holiday_dates=holiday_dates): remaining -= step return current def _workday_scalar( start_date: date | None, days: int | None, *, week_mask: WeekMask, holiday_dates: tuple[date, ...], count_first_day: bool, ) -> date | None: if start_date is None or days is None: return None is_business = _is_business_day_scalar(start_date, week_mask=week_mask, holiday_dates=holiday_dates) if count_first_day: if days == 0: return start_date if is_business: if days > 0: return _advance_business_days(start_date, days - 1, week_mask=week_mask, holiday_dates=holiday_dates) return _advance_business_days(start_date, days + 1, week_mask=week_mask, holiday_dates=holiday_dates) if days > 0: if days == 1: return start_date first_business = _next_business_day(start_date, week_mask=week_mask, holiday_dates=holiday_dates) return _advance_business_days(first_business, days - 2, week_mask=week_mask, holiday_dates=holiday_dates) if days == -1: return start_date first_business = _previous_business_day(start_date, week_mask=week_mask, holiday_dates=holiday_dates) return _advance_business_days(first_business, days + 2, week_mask=week_mask, holiday_dates=holiday_dates) if is_business: if days == 0: return start_date return _advance_business_days(start_date, days, week_mask=week_mask, holiday_dates=holiday_dates) if days == 0: return _next_business_day(start_date, week_mask=week_mask, holiday_dates=holiday_dates) if days > 0: first_business = _next_business_day(start_date, week_mask=week_mask, holiday_dates=holiday_dates) return _advance_business_days(first_business, days - 1, week_mask=week_mask, holiday_dates=holiday_dates) first_business = _previous_business_day(start_date, week_mask=week_mask, holiday_dates=holiday_dates) return _advance_business_days(first_business, days + 1, week_mask=week_mask, holiday_dates=holiday_dates) def _workday_result( start_expr: pl.Expr, days_expr: pl.Expr, week_mask: WeekMask, holiday_dates: tuple[date, ...], *, count_first_day: bool, is_business: pl.Expr, ) -> pl.Expr: kwargs = {"week_mask": week_mask, "holidays": holiday_dates} if count_first_day: business_result = ( pl.when(days_expr > 0) .then(start_expr.dt.add_business_days(days_expr - 1, roll="forward", **kwargs)) .when(days_expr < 0) .then(start_expr.dt.add_business_days(days_expr + 1, roll="backward", **kwargs)) .otherwise(start_expr) ) nonbusiness_result = ( pl.when(days_expr > 0) .then( pl.when(days_expr == 1) .then(start_expr) .otherwise(start_expr.dt.add_business_days(days_expr - 2, roll="forward", **kwargs)) ) .when(days_expr < 0) .then( pl.when(days_expr == -1) .then(start_expr) .otherwise(start_expr.dt.add_business_days(days_expr + 2, roll="backward", **kwargs)) ) .otherwise(start_expr) ) else: business_result = ( pl.when(days_expr >= 0) .then(start_expr.dt.add_business_days(days_expr, roll="forward", **kwargs)) .otherwise(start_expr.dt.add_business_days(days_expr, roll="backward", **kwargs)) ) nonbusiness_result = ( pl.when(days_expr > 0) .then(start_expr.dt.add_business_days(days_expr - 1, roll="forward", **kwargs)) .when(days_expr < 0) .then(start_expr.dt.add_business_days(days_expr + 1, roll="backward", **kwargs)) .otherwise(start_expr.dt.add_business_days(pl.lit(0), roll="forward", **kwargs)) ) return pl.when(is_business).then(business_result).otherwise(nonbusiness_result)
[docs] def write_parquet_atomic(df: pl.DataFrame, path: PathLike, **write_options: object) -> Path: """Write a Polars dataframe to parquet with same-directory atomic replacement. The dataframe is first written to a unique temporary file beside the target, then moved into place with ``os.replace``. This keeps readers from seeing a partially written parquet file while preserving normal Polars write options. Parameters ---------- df : pl.DataFrame Eager Polars dataframe to write. path : PathLike Target parquet path. **write_options : object Keyword options forwarded to ``pl.DataFrame.write_parquet``. Returns ------- Path Absolute target path that was replaced. Examples -------- .. code-block:: python import polars as pl from data_engine.helpers import write_parquet_atomic target = write_parquet_atomic( pl.DataFrame({"claim_id": [1, 2]}), "workspaces/example/output/docs.parquet", ) df = pl.DataFrame({"claim_id": [3]}) df.de.write_parquet_atomic(target) """ return _write_atomic(path, lambda temporary_path: df.write_parquet(temporary_path, **write_options))
[docs] def write_excel_atomic( df: pl.DataFrame, path: PathLike, worksheet: str | None = None, **write_options: object, ) -> Path: """Write a Polars dataframe to Excel with same-directory atomic replacement. The dataframe is first written to a unique temporary workbook beside the target, then moved into place with ``os.replace``. All keyword options are forwarded to ``pl.DataFrame.write_excel``. Parameters ---------- df : pl.DataFrame Eager Polars dataframe to write. path : PathLike Target Excel workbook path. worksheet : str | None Optional worksheet name forwarded to ``pl.DataFrame.write_excel``. **write_options : object Keyword options forwarded to ``pl.DataFrame.write_excel``. Returns ------- Path Absolute target path that was replaced. Examples -------- .. code-block:: python import polars as pl from data_engine.helpers import write_excel_atomic target = write_excel_atomic( pl.DataFrame({"claim_id": [1, 2]}), "workspaces/example/output/docs.xlsx", worksheet="Docs", table_name="docs", autofit=True, ) df = pl.DataFrame({"claim_id": [3]}) df.de.write_excel_atomic(target, worksheet="Docs") """ return _write_atomic( path, lambda temporary_path: df.write_excel(temporary_path, worksheet=worksheet, **write_options), )
[docs] def sink_parquet_atomic(lf: pl.LazyFrame, path: PathLike, **sink_options: object) -> Path: """Sink a Polars lazy frame to parquet with same-directory atomic replacement. The lazy query is executed into a unique temporary file beside the target, then moved into place with ``os.replace``. Use the default eager sink mode so the helper can complete the replacement in the same call. Parameters ---------- lf : pl.LazyFrame Lazy Polars frame to execute and write. path : PathLike Target parquet path. **sink_options : object Keyword options forwarded to ``pl.LazyFrame.sink_parquet``. Returns ------- Path Absolute target path that was replaced. Raises ------ ValueError If ``lazy=True`` is passed. Examples -------- .. code-block:: python import polars as pl import data_engine.helpers lf = pl.DataFrame({"claim_id": [1, 2]}).lazy() lf.de.sink_parquet_atomic("workspaces/example/output/docs.parquet") """ if sink_options.get("lazy") is True: raise ValueError("Atomic LazyFrame parquet writes require eager sink execution; pass lazy=False or omit lazy.") return _write_atomic(path, lambda temporary_path: lf.sink_parquet(temporary_path, **sink_options))
def _write_atomic(path: PathLike, write: Callable[[Path], object]) -> Path: target_path = Path(path).expanduser().resolve() target_path.parent.mkdir(parents=True, exist_ok=True) temporary_path = target_path.with_name(f".{target_path.name}.{uuid4().hex}.tmp") try: write(temporary_path) _replace_atomic(temporary_path, target_path) except BaseException: _remove_temporary_file(temporary_path) raise return target_path def _replace_atomic(source_path: Path, target_path: Path) -> None: backoff_seconds = (0.0, 0.02, 0.05, 0.1, 0.2) last_error: BaseException | None = None for delay_seconds in backoff_seconds: if delay_seconds > 0.0: time.sleep(delay_seconds) try: os.replace(source_path, target_path) return except PermissionError as exc: if os.name != "nt" or getattr(exc, "winerror", None) != 5: raise last_error = exc continue if last_error is not None: raise last_error os.replace(source_path, target_path) def _remove_temporary_file(path: Path) -> None: try: path.unlink() except FileNotFoundError: pass except OSError: pass
[docs] @pl.api.register_dataframe_namespace("de") class DataEngineDataFrameNamespace: """Data Engine helpers available from ``pl.DataFrame.de``.""" def __init__(self, df: pl.DataFrame) -> None: self._df = df
[docs] def normalize_column_names(self, columns: Iterable[object] | None = None) -> pl.DataFrame: """Normalize column names on this dataframe. Parameters ---------- columns : Iterable[object] | None Optional subset of column names to normalize. When omitted, all dataframe columns are normalized. Returns ------- pl.DataFrame Dataframe with normalized column names. """ return _normalize_column_names(self._df, columns)
[docs] def networkdays( self, start: ExprLike, end: ExprLike, *, holidays: list[DateLike | str] | tuple[DateLike | str, ...] | set[DateLike | str] | None = None, count_first_day: bool = False, mask: Iterable[bool] | None = None, ) -> pl.Expr: """Return an Excel-style business-day count expression for this dataframe. This is a convenience wrapper around :func:`data_engine.helpers.networkdays`. The returned value is still a normal ``pl.Expr``, so it can be chained into cumulative windows and other Polars expressions. Example ------- .. code-block:: python df = df.with_columns( business_days=df.de.networkdays( "start_date", "end_date", holidays=[date(2026, 4, 15)], ) ) df = df.sort(["claim_id", "sequence_number"]).with_columns( cumulative_business_days= pl.when(pl.col("use_days")) .then(df.de.networkdays("start_date", "end_date")) .otherwise(pl.lit(0)) .cum_sum() .over("claim_id") ) """ return networkdays( start, end, holidays=holidays, count_first_day=count_first_day, mask=mask, )
[docs] def workday( self, start: ExprLike, days: IntExprLike, *, holidays: list[DateLike | str] | tuple[DateLike | str, ...] | set[DateLike | str] | None = None, count_first_day: bool = False, mask: Iterable[bool] | None = None, ) -> pl.Expr: """Return an Excel-style workday offset expression for this dataframe. This is a convenience wrapper around :func:`data_engine.helpers.workday`. Example ------- .. code-block:: python df = df.with_columns( due_date=df.de.workday( "received_date", "sla_days", holidays=[date(2026, 4, 15)], ) ) """ return workday( start, days, holidays=holidays, count_first_day=count_first_day, mask=mask, )
[docs] def write_parquet_atomic(self, path: PathLike, **write_options: object) -> Path: """Write this dataframe to parquet with atomic target replacement. Parameters ---------- path : PathLike Target parquet path. **write_options : object Keyword options forwarded to ``pl.DataFrame.write_parquet``. Returns ------- Path Absolute target path that was replaced. """ return write_parquet_atomic(self._df, path, **write_options)
[docs] def write_excel_atomic( self, path: PathLike, worksheet: str | None = None, **write_options: object, ) -> Path: """Write this dataframe to Excel with atomic target replacement. Parameters ---------- path : PathLike Target Excel workbook path. worksheet : str | None Optional worksheet name forwarded to ``pl.DataFrame.write_excel``. **write_options : object Keyword options forwarded to ``pl.DataFrame.write_excel``. Returns ------- Path Absolute target path that was replaced. """ return write_excel_atomic(self._df, path, worksheet=worksheet, **write_options)
[docs] def build_dimension( self, db_path: PathLike, table: str, *, key_column: str = "dimension_key", return_df: bool = True, ) -> pl.DataFrame | None: """Build or extend one DuckDB dimension table from this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. key_column : str Surrogate key column to create in the dimension table. return_df : bool Whether to return the mapping dataframe for this dataframe's natural keys. Returns ------- pl.DataFrame | None Mapping dataframe when ``return_df`` is true; otherwise ``None``. """ return _build_dimension(db_path, table, df=self._df, key_column=key_column, return_df=return_df)
[docs] def attach_dimension( self, db_path: PathLike, table: str, *, on: ColumnNames, key_column: str = "dimension_key", drop_key: bool = False, ) -> pl.DataFrame: """Attach an existing DuckDB dimension key to this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. on : ColumnNames Natural key column or columns used to join to the dimension table. key_column : str Surrogate key column to attach. drop_key : bool Whether to drop the natural key columns after attaching the surrogate key. Returns ------- pl.DataFrame Dataframe with the surrogate key column attached. """ return _attach_dimension( db_path, table, df=self._df, on=on, key_column=key_column, drop_key=drop_key, )
[docs] def denormalize_columns( self, db_path: PathLike, table: str, *, key_column: str = "dimension_key", select: ColumnNames = "*", drop_key: bool = False, ) -> pl.DataFrame: """Attach natural columns from an existing DuckDB dimension table. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. key_column : str Surrogate key column used to join to the dimension table. select : ColumnNames Natural columns to attach, or ``"*"`` for all non-key columns. drop_key : bool Whether to drop ``key_column`` after attaching the natural columns. Returns ------- pl.DataFrame Dataframe with selected dimension columns attached. """ return _denormalize_columns( db_path, table, df=self._df, key_column=key_column, select=select, drop_key=drop_key, )
[docs] def normalize_columns( self, db_path: PathLike, table: str, *, on: ColumnNames, key_column: str = "dimension_key", drop_key: bool = True, returns: ReturnMode = "df", ) -> pl.DataFrame | None: """Build dimension keys and attach them back onto this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. on : ColumnNames Natural key column or columns used to build the dimension. key_column : str Surrogate key column to create and attach. drop_key : bool Whether to drop natural key columns after attaching the surrogate key. returns : ReturnMode ``"df"`` for normalized input rows, ``"map"`` for only the key mapping, or ``None`` to only persist dimension rows. Returns ------- pl.DataFrame | None Normalized dataframe, mapping dataframe, or ``None`` according to ``returns``. """ return _normalize_columns( db_path, table, df=self._df, on=on, key_column=key_column, drop_key=drop_key, returns=returns, )
[docs] def replace_rows_by_file( self, db_path: PathLike, table: str, *, file_hash: str, file_hash_column: str = "file_key", return_df: bool = True, ) -> pl.DataFrame | None: """Replace one file's DuckDB rows and append this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. file_hash : str Stable source-file identifier used to delete the previous batch. file_hash_column : str Column name used to store ``file_hash`` in the destination table. return_df : bool Whether to return this dataframe with the file hash column attached. Returns ------- pl.DataFrame | None Inserted rows with ``file_hash_column`` when ``return_df`` is true; otherwise ``None``. """ return _replace_rows_by_file( db_path, table, df=self._df, file_hash=file_hash, file_hash_column=file_hash_column, return_df=return_df, )
[docs] def replace_rows_by_values( self, db_path: PathLike, table: str, *, column: str, return_df: bool = True, ) -> pl.DataFrame | None: """Replace DuckDB rows matching this dataframe's values for one column. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. column : str Column whose incoming values define the rows to replace. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None Inserted dataframe when ``return_df`` is true; otherwise ``None``. """ return _replace_rows_by_values(db_path, table, df=self._df, column=column, return_df=return_df)
[docs] def replace_table( self, db_path: PathLike, table: str, *, return_df: bool = True, ) -> pl.DataFrame | None: """Replace one DuckDB table wholesale from this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None Inserted dataframe when ``return_df`` is true; otherwise ``None``. """ return _replace_table(db_path, table, df=self._df, return_df=return_df)
[docs] @pl.api.register_lazyframe_namespace("de") class DataEngineLazyFrameNamespace: """Data Engine helpers available from ``pl.LazyFrame.de``.""" def __init__(self, lf: pl.LazyFrame) -> None: self._lf = lf
[docs] def normalize_column_names(self, columns: Iterable[object] | None = None) -> pl.LazyFrame: """Normalize column names on this lazy frame. Parameters ---------- columns : Iterable[object] | None Optional subset of column names to normalize. When omitted, all lazy-frame columns are normalized. Returns ------- pl.LazyFrame Lazy frame with normalized column names. """ return _normalize_column_names(self._lf, columns)
[docs] def networkdays( self, start: ExprLike, end: ExprLike, *, holidays: list[DateLike | str] | tuple[DateLike | str, ...] | set[DateLike | str] | None = None, count_first_day: bool = False, mask: Iterable[bool] | None = None, ) -> pl.Expr: """Return an Excel-style business-day count expression for this lazy frame. This is a convenience wrapper around :func:`data_engine.helpers.networkdays`. The returned value stays lazy and can be chained into window expressions before ``collect()``. """ return networkdays( start, end, holidays=holidays, count_first_day=count_first_day, mask=mask, )
[docs] def workday( self, start: ExprLike, days: IntExprLike, *, holidays: list[DateLike | str] | tuple[DateLike | str, ...] | set[DateLike | str] | None = None, count_first_day: bool = False, mask: Iterable[bool] | None = None, ) -> pl.Expr: """Return an Excel-style workday offset expression for this lazy frame. This is a convenience wrapper around :func:`data_engine.helpers.workday`. """ return workday( start, days, holidays=holidays, count_first_day=count_first_day, mask=mask, )
[docs] def sink_parquet_atomic(self, path: PathLike, **sink_options: object) -> Path: """Execute this lazy frame to parquet with atomic target replacement. Parameters ---------- path : PathLike Target parquet path. **sink_options : object Keyword options forwarded to ``pl.LazyFrame.sink_parquet``. Returns ------- Path Absolute target path that was replaced. """ return sink_parquet_atomic(self._lf, path, **sink_options)
[docs] def build_dimension( self, db_path: PathLike, table: str, *, key_column: str = "dimension_key", return_df: bool = True, ) -> pl.DataFrame | None: """Build or extend one DuckDB dimension table from this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. key_column : str Surrogate key column to create in the dimension table. return_df : bool Whether to return the mapping dataframe for this lazy frame's natural keys. Returns ------- pl.DataFrame | None Mapping dataframe when ``return_df`` is true; otherwise ``None``. """ return _build_dimension(db_path, table, df=self._lf, key_column=key_column, return_df=return_df)
[docs] def attach_dimension( self, db_path: PathLike, table: str, *, on: ColumnNames, key_column: str = "dimension_key", drop_key: bool = False, ) -> pl.DataFrame: """Attach an existing DuckDB dimension key to this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. on : ColumnNames Natural key column or columns used to join to the dimension table. key_column : str Surrogate key column to attach. drop_key : bool Whether to drop the natural key columns after attaching the surrogate key. Returns ------- pl.DataFrame Collected dataframe with the surrogate key column attached. """ return _attach_dimension( db_path, table, df=self._lf, on=on, key_column=key_column, drop_key=drop_key, )
[docs] def denormalize_columns( self, db_path: PathLike, table: str, *, key_column: str = "dimension_key", select: ColumnNames = "*", drop_key: bool = False, ) -> pl.DataFrame: """Attach natural columns from an existing DuckDB dimension table. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. key_column : str Surrogate key column used to join to the dimension table. select : ColumnNames Natural columns to attach, or ``"*"`` for all non-key columns. drop_key : bool Whether to drop ``key_column`` after attaching the natural columns. Returns ------- pl.DataFrame Collected dataframe with selected dimension columns attached. """ return _denormalize_columns( db_path, table, df=self._lf, key_column=key_column, select=select, drop_key=drop_key, )
[docs] def normalize_columns( self, db_path: PathLike, table: str, *, on: ColumnNames, key_column: str = "dimension_key", drop_key: bool = True, returns: ReturnMode = "df", ) -> pl.DataFrame | None: """Build dimension keys and attach them back onto this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. on : ColumnNames Natural key column or columns used to build the dimension. key_column : str Surrogate key column to create and attach. drop_key : bool Whether to drop natural key columns after attaching the surrogate key. returns : ReturnMode ``"df"`` for normalized input rows, ``"map"`` for only the key mapping, or ``None`` to only persist dimension rows. Returns ------- pl.DataFrame | None Normalized dataframe, mapping dataframe, or ``None`` according to ``returns``. """ return _normalize_columns( db_path, table, df=self._lf, on=on, key_column=key_column, drop_key=drop_key, returns=returns, )
[docs] def replace_rows_by_file( self, db_path: PathLike, table: str, *, file_hash: str, file_hash_column: str = "file_key", return_df: bool = True, ) -> pl.DataFrame | None: """Replace one file's DuckDB rows and append this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. file_hash : str Stable source-file identifier used to delete the previous batch. file_hash_column : str Column name used to store ``file_hash`` in the destination table. return_df : bool Whether to return the collected frame with the file hash column attached. Returns ------- pl.DataFrame | None Inserted rows with ``file_hash_column`` when ``return_df`` is true; otherwise ``None``. """ return _replace_rows_by_file( db_path, table, df=self._lf, file_hash=file_hash, file_hash_column=file_hash_column, return_df=return_df, )
[docs] def replace_rows_by_values( self, db_path: PathLike, table: str, *, column: str, return_df: bool = True, ) -> pl.DataFrame | None: """Replace DuckDB rows matching this lazy frame's values for one column. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. column : str Column whose incoming values define the rows to replace. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None Inserted dataframe when ``return_df`` is true; otherwise ``None``. """ return _replace_rows_by_values(db_path, table, df=self._lf, column=column, return_df=return_df)
[docs] def replace_table( self, db_path: PathLike, table: str, *, return_df: bool = True, ) -> pl.DataFrame | None: """Replace one DuckDB table wholesale from this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None Inserted dataframe when ``return_df`` is true; otherwise ``None``. """ return _replace_table(db_path, table, df=self._lf, return_df=return_df)