API Reference
The package entrypoints most users will import are:
data_engine.Flowdata_engine.FlowContextdata_engine.discover_flowsdata_engine.load_flowdata_engine.run
Core Flow Model
Pure flow definitions and fluent builder methods.
- class Flow(group, name=None, label=None, trigger=None, mirror_spec=None, steps=(), _workspace_root=None)[source]
Bases:
objectImmutable fluent builder for generic runtime flows.
Flowis the authoring object most flow modules return. Builder methods never mutate the existing instance; each method returns a new flow so chained definitions stay predictable and easy to review.Examples
from data_engine import Flow flow = ( Flow(group="Claims") .watch(mode="poll", source="incoming", interval="30s", extensions=[".xlsx"]) .mirror(root="processed") ) assert flow.mode == "poll"
- Variables:
group (str) – Display group used by the GUI, TUI, and runtime summaries.
name (str | None) – Stable flow identifier. When omitted in a flow module, the module loader can derive it from the module name.
label (str | None) – Optional human-readable display label.
trigger (WatchSpec | None) – Runtime trigger configuration.
mirror_spec (MirrorSpec | None) – Mirrored output path configuration.
steps (tuple[StepSpec, ...]) – Ordered callable steps to run.
- Parameters:
group (str)
name (str | None)
label (str | None)
trigger (WatchSpec | None)
mirror_spec (MirrorSpec | None)
steps (tuple[StepSpec, ...])
_workspace_root (Path | None)
Notes
Builder methods return a new
Flowinstance, so calls can be chained.- group: str
- name: str | None = None
- label: str | None = None
- mirror_spec: MirrorSpec | None = None
- watch(*, mode, run_as='individual', max_parallel=1, source=None, interval=None, time=None, extensions=None, settle=1)[source]
Configure how this flow is triggered.
Use
manualfor explicit operator-driven runs,pollwhen source file changes should trigger work, andschedulewhen time should trigger work.run_as="individual"runs once per concrete source file;run_as="batch"runs once for the watched source root.Examples
from data_engine import Flow manual_flow = Flow(group="Claims").watch(mode="manual") poll_flow = Flow(group="Claims").watch(mode="poll", source="incoming", interval="5s") schedule_flow = Flow(group="Claims").watch(mode="schedule", interval="15m") assert manual_flow.mode == "manual" assert poll_flow.mode == "poll" assert schedule_flow.mode == "schedule"
- Parameters:
mode (str) – Trigger mode:
"manual","poll", or"schedule".run_as (str) –
"individual"to run once per source file, or"batch"to run once for the full source set.max_parallel (int) – Maximum number of concurrent source-file runs for one flow when
run_as="individual". Defaults to1.source (str | Path | None) – File or directory watched by poll/schedule triggers.
interval (str | None) – Duration string such as
"10s"or"5m"for poll intervals or recurring schedules.time (str | tuple[str, ...] | list[str] | set[str] | None) – One or more
HH:MMdaily schedule times.extensions (tuple[str, ...] | list[str] | set[str] | None) – Optional file extensions used when discovering source files.
settle (int) – Polling settle window in seconds before a changed file is queued.
- Returns:
A new flow with the trigger configuration attached.
- Return type:
- Raises:
FlowValidationError – If the trigger mode, source, schedule, or polling options are inconsistent.
- mirror(*, root)[source]
Bind a mirrored output namespace rooted at one directory.
mirroronly configures paths. It does not write files by itself; the runtime later exposes write-ready paths throughcontext.mirror.Examples
from data_engine import Flow flow = Flow(group="Claims").mirror(root="processed") assert str(flow.mirror_spec.root).endswith("processed")
- Parameters:
root (str | Path) – Directory used by
context.mirrorhelpers when writing outputs.- Returns:
A new flow with mirror output helpers enabled.
- Return type:
- step(fn, *, use=None, save_as=None, label=None)[source]
Append one callable step to the flow.
stepis the default workhorse for flow authoring. The callable receives oneFlowContextand its return value becomes the nextcontext.current. Usesave_asto keep an intermediate result undercontext.objectsanduseto load a saved object before a later step runs.Examples
from data_engine import Flow def read_claims(context): return context.current flow = Flow(group="Claims").step(read_claims, save_as="raw_df") assert flow.steps[0].save_as == "raw_df"
- Parameters:
fn (Callable[[FlowContext], object]) – Callable that accepts a single
FlowContextand returns the next value forcontext.current.use (str | None) – Optional named object slot to load into
context.currentbefore the step runs.save_as (str | None) – Optional named object slot to store the step result in.
label (str | None) – Optional display label for logs and UI step summaries.
- Returns:
A new flow with the step appended.
- Return type:
- Raises:
FlowValidationError – If
fnis not callable or does not accept exactly one argument.
- map(fn, *, use=None, save_as=None, label=None)[source]
Append a step that maps a callable over the current iterable value.
mapis best when the same callable should run once per collected item. The callable may accept eitheritemorcontext, itemand the mapped values are returned as aBatch.Examples
from data_engine import Flow def summarize(file_ref): return file_ref.name flow = Flow(group="Claims").collect(extensions=[".xlsx"]).map(fn=summarize) assert len(flow.steps) == 2
- Parameters:
fn (Callable[..., object]) – Callable accepting either
itemorcontext, item.use (str | None) – Optional named object slot to map instead of the current value.
save_as (str | None) – Optional named object slot to store the mapped
Batchresult in.label (str | None) – Optional display label for logs and UI step summaries.
- Returns:
A new flow with the mapping step appended.
- Return type:
- Raises:
FlowValidationError – If
fnis not callable, has an unsupported signature, or the mapped current value is not iterable.
- collect(extensions, *, root=None, recursive=False, use=None, save_as=None, label=None)[source]
Append a step that collects source files into a
Batch.If
rootis omitted, collection uses the active source root from the runtime context. This keeps scheduled or poll-driven batch flows concise because the watched source directory can also be the collection root.Examples
from data_engine import Flow flow = Flow(group="Docs").collect(extensions=[".pdf"], recursive=True) assert flow.steps[0].label == "Collect Files"
- Parameters:
extensions (tuple[str, ...] | list[str] | set[str]) – File extensions to include, such as
(".xlsx", ".csv").root (str | Path | None) – Optional search root. Defaults to the active source root.
recursive (bool) – Whether to search child directories.
use (str | None) – Optional named object slot to load before collecting.
save_as (str | None) – Optional named object slot to store the collected batch in.
label (str | None) – Optional display label for logs and UI step summaries.
- Returns:
A new flow with the collection step appended.
- Return type:
- step_each(fn, *, use=None, save_as=None, label=None)[source]
Alias for
mapthat reads naturally in step chains.- Parameters:
fn (Callable[..., object]) – Callable accepting either
itemorcontext, item.use (str | None) – Optional named object slot to map instead of the current value.
save_as (str | None) – Optional named object slot to store the mapped
Batchresult in.label (str | None) – Optional display label for logs and UI step summaries.
- Returns:
A new flow with the per-item step appended.
- Return type:
- property mode: str
Return the trigger mode or
manualfor unconfigured flows.
Core Primitives
Core flow specs, contexts, and small containers.
- class Batch(items)[source]
Bases:
Generic[T]Small iterable runtime container used instead of exposing raw lists by default.
- Parameters:
items (tuple[T, ...])
- items: tuple[T, ...]
- class FileRef(path)[source]
Bases:
objectThin runtime wrapper for one filesystem path in a batch-oriented flow.
- Parameters:
path (Path)
- path: Path
- property name: str
Return the file name including extension.
- property stem: str
Return the file name without extension.
- property suffix: str
Return the file extension.
- property parent: Path
Return the parent directory.
- class FlowContext(flow_name, group, source=None, mirror=None, current=None, objects=<factory>, metadata=<factory>, config=<factory>)[source]
Bases:
objectMutable runtime state shared across steps during one flow execution.
Steps receive a
FlowContextobject.currentis the active value,objectsstores named intermediate values created withsave_as,metadataholds runtime annotations, andsource/mirrorexpose source and output path helpers when the flow configuration provides them.- Variables:
flow_name (str) – Stable flow name for the current execution.
group (str) – Flow group used by operator surfaces.
source (SourceContext | None) – Source path helper for source-backed executions.
mirror (MirrorContext | None) – Write-ready mirrored output helper when the flow configured a mirror.
current (object | None) – Active value passed between steps.
objects (dict[str, object]) – Named intermediate values saved by
save_as.metadata (dict[str, object]) – Runtime metadata attached to the execution.
config (WorkspaceConfigContext) – Lazy workspace config reader.
- Parameters:
flow_name (str)
group (str)
source (SourceContext | None)
mirror (MirrorContext | None)
current (object | None)
objects (dict[str, object])
metadata (dict[str, object])
config (WorkspaceConfigContext)
Examples
from data_engine.core.primitives import FlowContext context = FlowContext(flow_name="claims", group="Claims", current=1) context.objects["raw"] = context.current assert context.current == 1 assert context.objects["raw"] == 1
- flow_name: str
- group: str
- source: SourceContext | None = None
- mirror: MirrorContext | None = None
- current: object | None = None
- objects: dict[str, object]
- metadata: dict[str, object]
- config: WorkspaceConfigContext
- source_metadata()[source]
Return filesystem metadata for the current source file when available.
- Return type:
SourceMetadata | None
- database(name)[source]
Return a write-ready path beneath the workspace databases directory.
Use this for workspace-owned DuckDB files and other durable database artifacts. The returned path is rooted under
<workspace>/databases/and parent directories are created for you.- Parameters:
name (str | Path) – Relative database file name, such as
"analytics.duckdb"or"claims/analytics.duckdb".- Returns:
Absolute write-ready database path.
- Return type:
Path
- Raises:
FlowValidationError – If the flow is not running from an authored workspace, or if
nameis absolute or empty.
- class MirrorContext(root, source_path=None, relative_path=None)[source]
Bases:
objectWrite-ready mirrored output namespace for one runtime source.
context.mirroris available when a flow was configured withFlow.mirror(root=...). The helpers return paths and create parent directories as needed, but they do not write file contents.- Parameters:
root (Path)
source_path (Path | None)
relative_path (Path | None)
- root: Path
- source_path: Path | None = None
- relative_path: Path | None = None
- property dir: Path
Return a write-ready namespace directory for derived files.
- property folder: Path
Return the mirrored parent folder for the current source file.
- with_suffix(suffix)[source]
Return the canonical mirrored source path with a replaced suffix.
- Parameters:
suffix (str)
- Return type:
Path
- with_extension(suffix)[source]
Return the canonical mirrored source path with a replaced extension.
- Parameters:
suffix (str)
- Return type:
Path
- file(name)[source]
Return a write-ready file path in the mirrored source folder.
- Parameters:
name (str | Path)
- Return type:
Path
- class MirrorSpec(root)[source]
Bases:
objectStatic flow-level mirror binding.
- Parameters:
root (Path)
- root: Path
- class SourceContext(root, path=None, relative_path=None)[source]
Bases:
objectResolved source namespace for one runtime source.
context.sourcepoints at the watched source root and, for individual file runs, the concrete source file. Its helpers are read-oriented path conveniences; unlikeMirrorContextthey do not create directories.- Parameters:
root (Path)
path (Path | None)
relative_path (Path | None)
- root: Path
- path: Path | None = None
- relative_path: Path | None = None
- property dir: Path
Return the namespace directory for files derived from the active source.
- property folder: Path
Return the parent folder for the active source file.
- with_suffix(suffix)[source]
Return the source path with a replaced suffix.
- Parameters:
suffix (str)
- Return type:
Path
- with_extension(suffix)[source]
Return the source path with a replaced extension.
- Parameters:
suffix (str)
- Return type:
Path
- file(name)[source]
Return a derived file path in the active source folder.
- Parameters:
name (str | Path)
- Return type:
Path
- class SourceMetadata(path, name, size_bytes, modified_at_utc)[source]
Bases:
objectResolved filesystem metadata for the current source file.
- Parameters:
path (Path)
name (str)
size_bytes (int)
modified_at_utc (datetime)
- path: Path
- name: str
- size_bytes: int
- modified_at_utc: datetime
- class StepSpec(fn, use, save_as, label, function_name)[source]
Bases:
objectOne generic callable step in a flow.
- Parameters:
fn (Callable[[...], object])
use (str | None)
save_as (str | None)
label (str)
function_name (str)
- fn: Callable[[...], object]
- use: str | None
- save_as: str | None
- label: str
- function_name: str
- class WatchSpec(mode, run_as, max_parallel=1, source=None, interval=None, interval_seconds=None, time=None, times=(), time_slots=(), extensions=None, settle=1)[source]
Bases:
objectNormalized runtime watch configuration.
- Parameters:
mode (str)
run_as (str)
max_parallel (int)
source (Path | None)
interval (str | None)
interval_seconds (float | None)
time (str | tuple[str, ...] | None)
times (tuple[str, ...])
time_slots (tuple[tuple[int, int], ...])
extensions (tuple[str, ...] | None)
settle (int)
- mode: str
- run_as: str
- max_parallel: int = 1
- source: Path | None = None
- interval: str | None = None
- interval_seconds: float | None = None
- time: str | tuple[str, ...] | None = None
- times: tuple[str, ...] = ()
- time_slots: tuple[tuple[int, int], ...] = ()
- extensions: tuple[str, ...] | None = None
- settle: int = 1
- class WorkspaceConfigContext(workspace_root=None, _cache=<factory>, _names=None)[source]
Bases:
objectLazy read-only access to workspace-local TOML config files.
context.configreads files from<workspace>/config/*.tomlon demand. It returns dictionaries so flows can keep environment-specific settings out of Python modules without introducing a larger configuration framework.- Variables:
workspace_root (Path | None) – Authored workspace root. When omitted, config lookup is unavailable and returns no names.
- Parameters:
workspace_root (Path | None)
_cache (dict[str, dict[str, object]])
_names (tuple[str, ...] | None)
Examples
from data_engine.core.primitives import WorkspaceConfigContext config = WorkspaceConfigContext() assert config.names() == ()
- workspace_root: Path | None = None
- property config_dir: Path | None
Return the conventional config directory for the authored workspace.
- get(name)[source]
Return one parsed config mapping when available.
- Parameters:
name (str)
- Return type:
dict[str, object] | None
- collect_files(extensions, *, root=None, recursive=False)[source]
Return a step callable that collects matching files into a Batch of FileRef items.
- Parameters:
extensions (tuple[str, ...] | list[str] | set[str])
root (str | Path | None)
recursive (bool)
- Return type:
Callable[[FlowContext], Batch[FileRef]]
Core Runtime Models
Core errors for Data Engine flow definitions and execution.
- exception FlowExecutionError(*, flow_name, phase, detail, step_label=None, function_name=None, source_path=None)[source]
Bases:
FlowValidationErrorRaised when a flow module fails during import, build, or runtime execution.
- Parameters:
flow_name (str)
phase (str)
detail (str)
step_label (str | None)
function_name (str | None)
source_path (Path | str | None)
- Return type:
None
Runtime Engine
Command-shaped runtime engine for executing core flows.
- class RuntimeEngine(*, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None, status_callback=None, flow_runtime_type=<class 'data_engine.runtime.execution.single.FlowRuntime'>, grouped_runtime_type=<class 'data_engine.runtime.execution.grouped.GroupedFlowRuntime'>, run_stop_controller=None)[source]
Bases:
objectExecute flows through explicit runtime commands.
The engine does not know about GUI, TUI, CLI, local settings, or daemon wiring. Hosts pass state/event collaborators in explicitly; the current implementation adapts the existing sequential and grouped runtimes while giving higher layers a command-shaped seam to target.
- Parameters:
runtime_ledger (RuntimeCacheLedger | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
status_callback (Callable[[str], None] | None)
flow_runtime_type (type[FlowRuntime])
grouped_runtime_type (type[GroupedFlowRuntime])
run_stop_controller (RuntimeStopController | None)
- run_once(flow)[source]
Run one flow once using its configured startup sources.
- Parameters:
flow (CoreFlow)
- Return type:
list[FlowContext]
- run_source(flow, source_path)[source]
Run one flow for a specific source path.
- Parameters:
flow (CoreFlow)
source_path (str | Path)
- Return type:
- run_batch(flow)[source]
Run one flow once in batch mode using the configured source root.
- Parameters:
flow (CoreFlow)
- Return type:
- preview(flow, *, use=None)[source]
Preview one flow through the one-shot runtime path.
- Parameters:
flow (CoreFlow)
use (str | None)
- Return type:
object
- run_continuous(flow)[source]
Run one flow continuously according to its trigger.
- Parameters:
flow (CoreFlow)
- Return type:
list[FlowContext]
- run_grouped(flows, *, continuous=True)[source]
Run flows grouped by flow group with sequential execution inside each group.
- Parameters:
flows (tuple['CoreFlow', ...])
continuous (bool)
- Return type:
list[FlowContext]
Runtime execution internals for authored flows.
- class FlowRuntime(flows, *, continuous, runtime_stop_event=None, flow_stop_event=None, status_callback=None, runtime_ledger=None, runtime_ledger_service=None, runtime_ledger_factory=None, run_stop_controller=None)[source]
Bases:
objectSequential runtime that executes one or more configured flows.
- Parameters:
flows (tuple['CoreFlow', ...])
continuous (bool)
runtime_stop_event (threading.Event | None)
flow_stop_event (threading.Event | None)
status_callback (Callable[[str], None] | None)
runtime_ledger (RuntimeCacheLedger | None)
runtime_ledger_service (RuntimeCacheLedgerService | None)
runtime_ledger_factory (Callable[[], RuntimeCacheLedger] | None)
run_stop_controller (RuntimeStopController | None)
- run()[source]
- Return type:
list[FlowContext]
- preview(*, use=None)[source]
Run exactly one flow for notebook-style inspection and return one object.
- Parameters:
use (str | None)
- run_source(flow, source_path)[source]
Run one flow for a specific source path.
- Parameters:
flow (CoreFlow)
source_path (str | Path)
- Return type:
- run_batch(flow)[source]
Run one flow once in batch mode using the configured source root.
- Parameters:
flow (CoreFlow)
- Return type:
- max_parallel_for_flow(flow)[source]
Return the allowed per-flow source concurrency for one flow.
- Parameters:
flow (CoreFlow)
- Return type:
int
- dispatch_queued_jobs(queue, queued_keys, pending_futures, executor, *, results)[source]
Submit queued source jobs up to each flow’s bounded concurrency and drain completions.
- Parameters:
queued_keys (set[tuple[str, str | None]])
pending_futures (dict[Future[FlowContext], tuple[QueuedRunJob, int]])
executor (ThreadPoolExecutor)
results (list[FlowContext])
- Return type:
None
- wait_for_dispatched_jobs(pending_futures, *, results)[source]
Wait for all pending queued jobs to complete.
- Parameters:
pending_futures (dict[Future[FlowContext], tuple[QueuedRunJob, int]])
results (list[FlowContext])
- Return type:
None
- class GroupedFlowRuntime(flows, *, continuous, runtime_stop_event=None, flow_stop_event=None, status_callback=None, runtime_ledger=None, runtime_ledger_service=None, runtime_ledger_factory=None, run_stop_controller=None)[source]
Bases:
objectGrouped orchestrator: sequential within a group, parallel across groups.
- Parameters:
flows (tuple['Flow', ...])
continuous (bool)
runtime_stop_event (threading.Event | None)
flow_stop_event (threading.Event | None)
status_callback (Callable[[str], None] | None)
runtime_ledger (RuntimeCacheLedger | None)
runtime_ledger_service (RuntimeCacheLedgerService | None)
runtime_ledger_factory (Callable[[], RuntimeCacheLedger] | None)
run_stop_controller (RuntimeStopController | None)
- run()[source]
- Return type:
list[FlowContext]
Run-id-aware runtime stop control.
- class RuntimeStopController[source]
Bases:
objectTrack stop requests for specific active runtime run ids.
- request_stop(run_id)[source]
Request that one active or future run id stop.
- Parameters:
run_id (str)
- Return type:
None
- unregister_run(run_id)[source]
Clear active and requested state for one completed run id.
- Parameters:
run_id (str)
- Return type:
None
File Watching
Filesystem discovery and polling services used by the flow runtime.
- class IFileWatcher(*args, **kwargs)[source]
Bases:
ProtocolCommon interface for runtime file watchers.
- class PollingWatcher(input_root, *, recursive=True, extensions=None, settle=1)[source]
Bases:
objectFilesystem polling watcher for one file or directory root.
- Parameters:
input_root (Path)
recursive (bool)
extensions (tuple[str, ...] | None)
settle (int)
- iter_candidate_paths(input_root, *, extensions=None, recursive=True, allow_missing=False)[source]
Yield candidate files from one root using optional extension filters.
- Parameters:
input_root (Path)
extensions (tuple[str, ...] | None)
recursive (bool)
allow_missing (bool)
- Return type:
Iterable[Path]
Authoring Helpers
Small schema helpers for Polars-oriented flow authoring.
- class ColumnCasts[source]
Bases:
dict[str,object]Dict-like dtype mapping with a Polars
applyhelper.TableSchema.dtypesreturns this type. Values are passed topolars.Expr.castso callers can use normal Polars dtype objects such aspl.String,pl.Int64, andpl.Datetime.applycasts remaining frame columns topl.String.
- class ColumnSelection(columns)[source]
Bases:
tuple[str, …]Tuple-like column projection with Polars convenience methods.
TableSchema.columnsreturns this type so schema definitions can be used directly in dataframe chains while still behaving like a normal tuple.Examples
import polars as pl from data_engine.helpers import TableSchema schema = TableSchema(columns=("Claim Id",), dtypes={"Claim Id": pl.Int64}) df = pl.DataFrame({"Claim Id": [1], "SSN": ["123"]}) assert schema.columns.apply(df).columns == ["Claim Id"]
- Parameters:
columns (Iterable[str])
- Return type:
- class DropColumns(columns)[source]
Bases:
tuple[str, …]Tuple-like drop list with a Polars
applyhelper.TableSchema.dropreturns this type. Empty drop lists are no-ops, which keeps chained cleanup code simple.- Parameters:
columns (Iterable[str])
- Return type:
- class RenameColumns[source]
Bases:
dict[str,str]Dict-like rename mapping with a Polars
applyhelper.TableSchema.renamereturns this type. Empty mappings are no-ops, so the same cleanup chain can be used whether a schema currently renames columns or not.
- class TableSchema(columns=(), dtypes=(), rename=(), drop=())[source]
Bases:
objectColumn cleanup helper for compact Polars dataframe chains.
TableSchemais intentionally small: it stores an explicit column projection, a source-column dtype map, a rename map, and a drop list. Each attribute exposes anapplymethod so flow code can decide the cleanup order explicitly instead of relying on a magical all-in-one schema operation.- Variables:
columns (Iterable[str] | ColumnSelection) – Explicit projection columns. Use
schema.columns.apply(df)wherever that projection belongs in your chain.dtypes (ColumnDtypes | ColumnCasts) – Source column names mapped to Polars dtype objects. Use
schema.dtypes.apply(df)to cast them. Remaining frame columns are cast topl.String.rename (ColumnRenames) – Source-to-target column names. Use
schema.rename.apply(df)to rename them.drop (Iterable[str]) – Source columns to remove. Use
schema.drop.apply(df)to drop them.
- Parameters:
columns (Iterable[str] | ColumnSelection)
dtypes (Mapping[str, object] | Iterable[tuple[str, object]] | ColumnCasts)
rename (Mapping[str, str] | Iterable[tuple[str, str]])
drop (Iterable[str])
Notes
columnsis an explicit projection applied at the point you callschema.columns.apply(df). For example, you might cast all incoming columns, drop private fields before persistence, write to DuckDB, and then select the columns to return.Examples
import polars as pl from data_engine.helpers import TableSchema schema = TableSchema( columns=("step", "time", "workflow"), dtypes={"step_to": pl.String, "time": pl.Time}, rename={"step_to": "step", "workflow_to": "workflow"}, drop=("workflow_from", "ssn"), ) df = pl.DataFrame( { "step_to": ["review"], "time": ["09:30:00"], "workflow_to": ["claims"], "workflow_from": ["intake"], "ssn": ["000-00-0000"], } ).with_columns(pl.col("time").str.to_time()) df = schema.dtypes.apply(df) df = schema.drop.apply(df) df = schema.rename.apply(df) df = schema.columns.apply(df) assert df.columns == ["step", "time", "workflow"] assert df.schema["workflow"] == pl.String
Normalize all incoming names first when source files use inconsistent spacing or capitalization:
df = pl.DataFrame({"Workflow\tTo": ["claims"]}) df = schema.normalize_column_names(df) assert df.columns == ["workflow_to"]
- columns: Iterable[str] | ColumnSelection = ()
- dtypes: Mapping[str, object] | Iterable[tuple[str, object]] | ColumnCasts = ()
- rename: Mapping[str, str] | Iterable[tuple[str, str]] = ()
- drop: Iterable[str] = ()
- normalize_column_name(name)[source]
Return a normalized column name.
- Parameters:
name (object) – Source column name to normalize.
- Returns:
Lowercase column name with separator-adjacent spaces removed, remaining whitespace collapsed, and spaces replaced with underscores.
- Return type:
str
- normalize_column_names(df, columns=None)[source]
Normalize column names on a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all frame columns are normalized.
- Returns:
Frame with normalized column names.
- Return type:
pl.DataFrame | pl.LazyFrame
- normalized_column_renames(columns)[source]
Return a Polars rename mapping for normalized column names.
- Parameters:
columns (Iterable[object]) – Source column names to normalize.
- Returns:
Mapping from original column names to normalized names, excluding names that are already normalized.
- Return type:
dict[str, str]
Polars namespace helpers for Data Engine flow authoring.
- networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return Excel-style business-day counts as a Polars expression.
This helper matches Excel
NETWORKDAYSsemantics by counting both endpoints when they are business days. Weekends default to Saturday/Sunday, and optional holidays are excluded from the count.The one intentional extension is
count_first_day. When enabled, the start date is still counted even if it falls on a masked weekday or one of the supplied holidays.- Parameters:
start (pl.Expr | str | date | datetime) – Start date expression, column name, or scalar date/datetime.
end (pl.Expr | str | date | datetime) – End date expression, column name, or scalar date/datetime.
holidays (list[date | datetime | str] | tuple[...] | set[...] | None) – Optional holiday dates removed from the business-day count. String values must use ISO date text such as
"2026-04-15".count_first_day (bool) – Whether to force the first day into the count when it would normally be excluded by the weekday mask or holiday list.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask. Every item must be a real
bool.Noneuses the Excel default: Monday-Friday counted, Saturday-Sunday excluded.
- Returns:
Expression that evaluates to the signed business-day count. Datetime inputs are normalized to their calendar date before counting.
- Return type:
pl.Expr
Examples
Add a row-level business-day count:
from datetime import date import polars as pl import data_engine.helpers df = pl.DataFrame( { "received_date": [date(2026, 4, 13), date(2026, 4, 14)], "due_date": [date(2026, 4, 17), date(2026, 4, 21)], } ).with_columns( business_days=data_engine.helpers.networkdays( "received_date", "due_date", holidays=[date(2026, 4, 15)], ) )
Use scalar datetimes and count the first day:
from datetime import datetime df = df.with_columns( sla_days=data_engine.helpers.networkdays( datetime(2026, 4, 13, 8, 30), pl.col("resolved_at"), count_first_day=True, ) )
Chain the expression into a grouped cumulative total:
df = ( df.sort(["claim_id", "sequence_number"]) .with_columns( cumulative_business_days= pl.when(pl.col("use_days")) .then( data_engine.helpers.networkdays( "start_date", "end_date", holidays=[date(2026, 4, 15)], ) ) .otherwise(pl.lit(0)) .cum_sum() .over("claim_id") ) )
Notes
networkdays(...)returns a normalpl.Expr. You can chain it intocum_sum(), window expressions, filters, and any other Polars expression pipeline.
- workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return Excel-style workday offsets as a Polars expression.
This helper mirrors Excel
WORKDAYby returning the business date that falls the requested number of working days before or afterstart.The one intentional extension is
count_first_day. When enabled, the start date itself can be day 1, even if it falls on a masked weekday or one of the supplied holidays.- Parameters:
start (pl.Expr | str | date | datetime) – Start date expression, column name, or scalar date/datetime.
days (pl.Expr | str | int) – Signed business-day offset expression, column name, or scalar integer.
holidays (list[date | datetime | str] | tuple[...] | set[...] | None) – Optional holiday dates skipped while calculating the result. String values must use ISO date text such as
"2026-04-15".count_first_day (bool) – Whether the start date itself can count as day 1 when moving forward or backward through business days.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask. Every item must be a real
bool.Noneuses the Excel default: Monday-Friday counted, Saturday-Sunday excluded.
- Returns:
Expression that evaluates to a
Dateresult. Datetime inputs are normalized to their calendar date before offsetting.- Return type:
pl.Expr
Examples
Add one target business date column:
from datetime import date import polars as pl import data_engine.helpers df = pl.DataFrame( { "received_date": [date(2026, 4, 13), date(2026, 4, 14)], "sla_days": [3, 5], } ).with_columns( due_date=data_engine.helpers.workday( "received_date", "sla_days", holidays=[date(2026, 4, 15)], ) )
Count the start date as day 1:
df = df.with_columns( due_date=data_engine.helpers.workday( "received_date", "sla_days", holidays=[date(2026, 4, 15)], count_first_day=True, ) )
Use a custom weekday mask where Saturday is also a business day:
df = df.with_columns( due_date=data_engine.helpers.workday( "received_date", "sla_days", mask=(True, True, True, True, True, True, False), ) )
- write_parquet_atomic(df, path, **write_options)[source]
Write a Polars dataframe to parquet with same-directory atomic replacement.
The dataframe is first written to a unique temporary file beside the target, then moved into place with
os.replace. This keeps readers from seeing a partially written parquet file while preserving normal Polars write options.- Parameters:
df (pl.DataFrame) – Eager Polars dataframe to write.
path (PathLike) – Target parquet path.
**write_options (object) – Keyword options forwarded to
pl.DataFrame.write_parquet.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
Examples
import polars as pl from data_engine.helpers import write_parquet_atomic target = write_parquet_atomic( pl.DataFrame({"claim_id": [1, 2]}), "workspaces/example/output/claims.parquet", ) df = pl.DataFrame({"claim_id": [3]}) df.de.write_parquet_atomic(target)
- write_excel_atomic(df, path, worksheet=None, **write_options)[source]
Write a Polars dataframe to Excel with same-directory atomic replacement.
The dataframe is first written to a unique temporary workbook beside the target, then moved into place with
os.replace. All keyword options are forwarded topl.DataFrame.write_excel.- Parameters:
df (pl.DataFrame) – Eager Polars dataframe to write.
path (PathLike) – Target Excel workbook path.
worksheet (str | None) – Optional worksheet name forwarded to
pl.DataFrame.write_excel.**write_options (object) – Keyword options forwarded to
pl.DataFrame.write_excel.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
Examples
import polars as pl from data_engine.helpers import write_excel_atomic target = write_excel_atomic( pl.DataFrame({"claim_id": [1, 2]}), "workspaces/example/output/claims.xlsx", worksheet="Claims", table_name="claims", autofit=True, ) df = pl.DataFrame({"claim_id": [3]}) df.de.write_excel_atomic(target, worksheet="Claims")
- sink_parquet_atomic(lf, path, **sink_options)[source]
Sink a Polars lazy frame to parquet with same-directory atomic replacement.
The lazy query is executed into a unique temporary file beside the target, then moved into place with
os.replace. Use the default eager sink mode so the helper can complete the replacement in the same call.- Parameters:
lf (pl.LazyFrame) – Lazy Polars frame to execute and write.
path (PathLike) – Target parquet path.
**sink_options (object) – Keyword options forwarded to
pl.LazyFrame.sink_parquet.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
- Raises:
ValueError – If
lazy=Trueis passed.
Examples
import polars as pl import data_engine.helpers lf = pl.DataFrame({"claim_id": [1, 2]}).lazy() lf.de.sink_parquet_atomic("workspaces/example/output/claims.parquet")
- class DataEngineDataFrameNamespace(df)[source]
Bases:
objectData Engine helpers available from
pl.DataFrame.de.- Parameters:
df (pl.DataFrame)
- normalize_column_names(columns=None)[source]
Normalize column names on this dataframe.
- Parameters:
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all dataframe columns are normalized.
- Returns:
Dataframe with normalized column names.
- Return type:
pl.DataFrame
- networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style business-day count expression for this dataframe.
This is a convenience wrapper around
data_engine.helpers.networkdays(). The returned value is still a normalpl.Expr, so it can be chained into cumulative windows and other Polars expressions.Example
df = df.with_columns( business_days=df.de.networkdays( "start_date", "end_date", holidays=[date(2026, 4, 15)], ) ) df = df.sort(["claim_id", "sequence_number"]).with_columns( cumulative_business_days= pl.when(pl.col("use_days")) .then(df.de.networkdays("start_date", "end_date")) .otherwise(pl.lit(0)) .cum_sum() .over("claim_id") )
- Parameters:
start (Expr | str | date | datetime)
end (Expr | str | date | datetime)
holidays (list[date | datetime | str] | tuple[date | datetime | str, ...] | set[date | datetime | str] | None)
count_first_day (bool)
mask (Iterable[bool] | None)
- Return type:
Expr
- workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style workday offset expression for this dataframe.
This is a convenience wrapper around
data_engine.helpers.workday().Example
df = df.with_columns( due_date=df.de.workday( "received_date", "sla_days", holidays=[date(2026, 4, 15)], ) )
- Parameters:
start (Expr | str | date | datetime)
days (Expr | str | int)
holidays (list[date | datetime | str] | tuple[date | datetime | str, ...] | set[date | datetime | str] | None)
count_first_day (bool)
mask (Iterable[bool] | None)
- Return type:
Expr
- write_parquet_atomic(path, **write_options)[source]
Write this dataframe to parquet with atomic target replacement.
- Parameters:
path (PathLike) – Target parquet path.
**write_options (object) – Keyword options forwarded to
pl.DataFrame.write_parquet.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
- write_excel_atomic(path, worksheet=None, **write_options)[source]
Write this dataframe to Excel with atomic target replacement.
- Parameters:
path (PathLike) – Target Excel workbook path.
worksheet (str | None) – Optional worksheet name forwarded to
pl.DataFrame.write_excel.**write_options (object) – Keyword options forwarded to
pl.DataFrame.write_excel.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
- build_dimension(db_path, table, *, key_column='dimension_key', return_df=True)[source]
Build or extend one DuckDB dimension table from this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column to create in the dimension table.
return_df (bool) – Whether to return the mapping dataframe for this dataframe’s natural keys.
- Returns:
Mapping dataframe when
return_dfis true; otherwiseNone.- Return type:
pl.DataFrame | None
- attach_dimension(db_path, table, *, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing DuckDB dimension key to this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to join to the dimension table.
key_column (str) – Surrogate key column to attach.
drop_key (bool) – Whether to drop the natural key columns after attaching the surrogate key.
- Returns:
Dataframe with the surrogate key column attached.
- Return type:
pl.DataFrame
- denormalize_columns(db_path, table, *, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing DuckDB dimension table.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column used to join to the dimension table.
select (ColumnNames) – Natural columns to attach, or
"*"for all non-key columns.drop_key (bool) – Whether to drop
key_columnafter attaching the natural columns.
- Returns:
Dataframe with selected dimension columns attached.
- Return type:
pl.DataFrame
- normalize_columns(db_path, table, *, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build dimension keys and attach them back onto this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to build the dimension.
key_column (str) – Surrogate key column to create and attach.
drop_key (bool) – Whether to drop natural key columns after attaching the surrogate key.
returns (ReturnMode) –
"df"for normalized input rows,"map"for only the key mapping, orNoneto only persist dimension rows.
- Returns:
Normalized dataframe, mapping dataframe, or
Noneaccording toreturns.- Return type:
pl.DataFrame | None
- replace_rows_by_file(db_path, table, *, file_hash, file_hash_column='file_key', return_df=True)[source]
Replace one file’s DuckDB rows and append this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
file_hash (str) – Stable source-file identifier used to delete the previous batch.
file_hash_column (str) – Column name used to store
file_hashin the destination table.return_df (bool) – Whether to return this dataframe with the file hash column attached.
- Returns:
Inserted rows with
file_hash_columnwhenreturn_dfis true; otherwiseNone.- Return type:
pl.DataFrame | None
- replace_rows_by_values(db_path, table, *, column, return_df=True)[source]
Replace DuckDB rows matching this dataframe’s values for one column.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
column (str) – Column whose incoming values define the rows to replace.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when
return_dfis true; otherwiseNone.- Return type:
pl.DataFrame | None
- replace_table(db_path, table, *, return_df=True)[source]
Replace one DuckDB table wholesale from this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when
return_dfis true; otherwiseNone.- Return type:
pl.DataFrame | None
- class DataEngineLazyFrameNamespace(lf)[source]
Bases:
objectData Engine helpers available from
pl.LazyFrame.de.- Parameters:
lf (pl.LazyFrame)
- normalize_column_names(columns=None)[source]
Normalize column names on this lazy frame.
- Parameters:
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all lazy-frame columns are normalized.
- Returns:
Lazy frame with normalized column names.
- Return type:
pl.LazyFrame
- networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style business-day count expression for this lazy frame.
This is a convenience wrapper around
data_engine.helpers.networkdays(). The returned value stays lazy and can be chained into window expressions beforecollect().- Parameters:
start (Expr | str | date | datetime)
end (Expr | str | date | datetime)
holidays (list[date | datetime | str] | tuple[date | datetime | str, ...] | set[date | datetime | str] | None)
count_first_day (bool)
mask (Iterable[bool] | None)
- Return type:
Expr
- workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style workday offset expression for this lazy frame.
This is a convenience wrapper around
data_engine.helpers.workday().- Parameters:
start (Expr | str | date | datetime)
days (Expr | str | int)
holidays (list[date | datetime | str] | tuple[date | datetime | str, ...] | set[date | datetime | str] | None)
count_first_day (bool)
mask (Iterable[bool] | None)
- Return type:
Expr
- sink_parquet_atomic(path, **sink_options)[source]
Execute this lazy frame to parquet with atomic target replacement.
- Parameters:
path (PathLike) – Target parquet path.
**sink_options (object) – Keyword options forwarded to
pl.LazyFrame.sink_parquet.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
- build_dimension(db_path, table, *, key_column='dimension_key', return_df=True)[source]
Build or extend one DuckDB dimension table from this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column to create in the dimension table.
return_df (bool) – Whether to return the mapping dataframe for this lazy frame’s natural keys.
- Returns:
Mapping dataframe when
return_dfis true; otherwiseNone.- Return type:
pl.DataFrame | None
- attach_dimension(db_path, table, *, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing DuckDB dimension key to this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to join to the dimension table.
key_column (str) – Surrogate key column to attach.
drop_key (bool) – Whether to drop the natural key columns after attaching the surrogate key.
- Returns:
Collected dataframe with the surrogate key column attached.
- Return type:
pl.DataFrame
- denormalize_columns(db_path, table, *, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing DuckDB dimension table.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column used to join to the dimension table.
select (ColumnNames) – Natural columns to attach, or
"*"for all non-key columns.drop_key (bool) – Whether to drop
key_columnafter attaching the natural columns.
- Returns:
Collected dataframe with selected dimension columns attached.
- Return type:
pl.DataFrame
- normalize_columns(db_path, table, *, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build dimension keys and attach them back onto this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to build the dimension.
key_column (str) – Surrogate key column to create and attach.
drop_key (bool) – Whether to drop natural key columns after attaching the surrogate key.
returns (ReturnMode) –
"df"for normalized input rows,"map"for only the key mapping, orNoneto only persist dimension rows.
- Returns:
Normalized dataframe, mapping dataframe, or
Noneaccording toreturns.- Return type:
pl.DataFrame | None
- replace_rows_by_file(db_path, table, *, file_hash, file_hash_column='file_key', return_df=True)[source]
Replace one file’s DuckDB rows and append this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
file_hash (str) – Stable source-file identifier used to delete the previous batch.
file_hash_column (str) – Column name used to store
file_hashin the destination table.return_df (bool) – Whether to return the collected frame with the file hash column attached.
- Returns:
Inserted rows with
file_hash_columnwhenreturn_dfis true; otherwiseNone.- Return type:
pl.DataFrame | None
- replace_rows_by_values(db_path, table, *, column, return_df=True)[source]
Replace DuckDB rows matching this lazy frame’s values for one column.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
column (str) – Column whose incoming values define the rows to replace.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when
return_dfis true; otherwiseNone.- Return type:
pl.DataFrame | None
- replace_table(db_path, table, *, return_df=True)[source]
Replace one DuckDB table wholesale from this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when
return_dfis true; otherwiseNone.- Return type:
pl.DataFrame | None
Public one-shot DuckDB helpers for flow authoring.
- attach_dimension(db_path, table, *, df, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing surrogate key mapping table to an input dataframe.
- Parameters:
db_path (str | Path)
table (str)
df (DataFrame | LazyFrame)
on (str | list[str] | tuple[str, ...])
key_column (str)
drop_key (bool)
- build_dimension(db_path, table, *, df, key_column='dimension_key', return_df=True)[source]
Build or extend one dimension table from unique incoming row combinations.
- Parameters:
db_path (str | Path)
table (str)
df (DataFrame | LazyFrame)
key_column (str)
return_df (bool)
- compact_database(db_path, *, tables=None, drop_all_null_columns=True, vacuum=True)[source]
Compact one DuckDB database by dropping all-null columns and optionally vacuuming.
- Parameters:
db_path (str | Path)
tables (str | list[str] | tuple[str, ...] | None)
drop_all_null_columns (bool)
vacuum (bool)
- Return type:
DataFrame
- denormalize_columns(db_path, table, *, df, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing dimension table onto a keyed dataframe.
- Parameters:
db_path (str | Path)
table (str)
df (DataFrame | LazyFrame)
key_column (str)
select (str | list[str] | tuple[str, ...])
drop_key (bool)
- normalize_columns(db_path, table, *, df, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build missing surrogate keys and attach them back onto the input dataframe.
- Parameters:
db_path (str | Path)
table (str)
df (DataFrame | LazyFrame)
on (str | list[str] | tuple[str, ...])
key_column (str)
drop_key (bool)
returns (str | None)
- read_rows_by_values(db_path, table, *, column, is_in, select)[source]
Return selected columns for rows whose one column matches provided values.
- Parameters:
db_path (str | Path) – DuckDB database file path.
table (str) – Source table name, optionally schema-qualified.
column (str) – Column matched against
is_in.is_in (list[object] | tuple[object, ...]) – Values to include.
select (str | list[str] | tuple[str, ...]) – Columns to return.
- Returns:
Selected matching rows in input order by distinct lookup values.
- Return type:
pl.DataFrame
- Raises:
ValueError – If the table, column, or selected columns are invalid.
- read_sql(db_path, *, sql)[source]
Run one SQL query against DuckDB and return the result as a Polars dataframe.
- Parameters:
db_path (str | Path) – DuckDB database file path.
sql (str) – Query text to execute.
- Returns:
Query result as a Polars dataframe.
- Return type:
pl.DataFrame
- read_table(db_path, table, *, select='*', where=None, limit=None)[source]
Read rows from one DuckDB table into a Polars dataframe.
- Parameters:
db_path (str | Path) – DuckDB database file path.
table (str) – Source table name, optionally schema-qualified.
select (str | list[str] | tuple[str, ...]) – Columns to return, or
"*"for all columns.where (str | None) – Optional raw SQL predicate appended after
WHERE.limit (int | None) – Optional row limit.
- Returns:
Selected table rows.
- Return type:
pl.DataFrame
- replace_rows_by_file(db_path, table, *, df, file_hash, file_hash_column='file_key', return_df=True)[source]
Atomically replace one file’s fact rows and append the current batch.
- Parameters:
db_path (str | Path)
table (str)
df (DataFrame | LazyFrame)
file_hash (str)
file_hash_column (str)
return_df (bool)
Host Surfaces
APScheduler-backed host for scheduled flow execution.
- class ScheduledFlowJob(job_id, flow_name, trigger_kind)[source]
Bases:
objectDescription of one scheduler job owned by the host.
- Parameters:
job_id (str)
flow_name (str)
trigger_kind (str)
- job_id: str
- flow_name: str
- trigger_kind: str
- class SchedulerHost(*, runtime_engine=None, scheduler=None, job_id_prefix='data-engine:schedule:')[source]
Bases:
objectOwn APScheduler timing while delegating flow meaning to the runtime engine.
- Parameters:
runtime_engine (RuntimeEngine | None)
scheduler (SchedulerPort | None)
job_id_prefix (str)
- rebuild_jobs(flows)[source]
Replace scheduler jobs from discovered scheduled flows.
- Parameters:
flows (tuple['Flow', ...])
- Return type:
tuple[ScheduledFlowJob, …]
- shutdown(*, wait=True)[source]
Stop the underlying scheduler.
- Parameters:
wait (bool)
- Return type:
None
- run_until_stopped(flows, stop_event)[source]
Run scheduled flow jobs until
stop_eventis set.- Parameters:
flows (tuple['Flow', ...])
stop_event (Event)
- Return type:
tuple[ScheduledFlowJob, …]
- class SchedulerPort(*args, **kwargs)[source]
Bases:
ProtocolSmall scheduler surface used by the scheduler host.
- add_job(func, *, trigger, id, replace_existing=False, max_instances=1)[source]
Add or replace one scheduled job.
- Parameters:
id (str)
replace_existing (bool)
max_instances (int)
Application Services
Daemon IPC and lifecycle services.
- class DaemonService(*, spawn_process_func=<function spawn_daemon_process>, request_func=<function daemon_request>, is_live_func=<function is_daemon_live>, force_shutdown_func=<function force_shutdown_daemon_process>, client_error_type=<class 'data_engine.hosts.daemon.client.DaemonClientError'>)[source]
Bases:
objectThin injectable wrapper around daemon lifecycle and IPC helpers.
- Parameters:
spawn_process_func (Callable[..., object])
request_func (Callable[..., dict[str, Any]])
is_live_func (Callable[[WorkspacePaths], bool])
force_shutdown_func (Callable[..., None])
client_error_type (type[Exception])
- spawn(paths, *, lifecycle_policy=DaemonLifecyclePolicy.PERSISTENT)[source]
Start the local workspace daemon process for the given paths.
- Parameters:
paths (WorkspacePaths)
lifecycle_policy (DaemonLifecyclePolicy)
- Return type:
object
- request(paths, payload, *, timeout=0.0)[source]
Send one request to the local workspace daemon.
- Parameters:
paths (WorkspacePaths)
payload (dict[str, Any])
timeout (float)
- Return type:
dict[str, Any]
- is_live(paths)[source]
Return whether the local workspace daemon is reachable.
- Parameters:
paths (WorkspacePaths)
- Return type:
bool
- force_shutdown(paths, *, timeout=0.5)[source]
Force-stop the local workspace daemon for the given paths.
- Parameters:
paths (WorkspacePaths)
timeout (float)
- Return type:
None
- property client_error_type: type[Exception]
Return the daemon client exception type.
Workspace daemon state and control services.
- class DaemonStateService(*, shared_state_adapter=None)[source]
Bases:
objectOwn workspace daemon-manager construction and normalized snapshot access.
- Parameters:
shared_state_adapter (DaemonSharedStateAdapter | None)
- create_manager(paths)[source]
Create one daemon-state manager for a workspace.
- Parameters:
paths (WorkspacePaths)
- Return type:
WorkspaceDaemonManager
- sync(manager)[source]
Fetch one normalized daemon snapshot.
- Parameters:
manager (WorkspaceDaemonManager)
- Return type:
WorkspaceDaemonSnapshot
Flow catalog loading services.
- class FlowCatalogService(*, discover_definitions_func=<function discover_flow_module_definitions>)[source]
Bases:
objectOwn flow catalog loading through an explicit discovery dependency.
- Parameters:
discover_definitions_func (Callable[..., tuple[FlowModuleDefinition, ...]])
- flow_catalog_entry_from_flow(flow, *, description)[source]
- Parameters:
flow (Flow)
description (str | None)
- Return type:
FlowCatalogEntry
Executable flow loading services.
- class FlowExecutionService(*, load_flow_func=<function _default_load_flow>, discover_flows_func=<function _default_discover_flows>)[source]
Bases:
objectOwn executable flow loading through an explicit loader dependency.
- Parameters:
load_flow_func (Callable[..., 'CoreFlow'])
discover_flows_func (Callable[..., tuple['CoreFlow', ...]])
- load_flow(name, *, workspace_root=None)[source]
Return one executable flow definition by name.
- Parameters:
name (str)
workspace_root (Path | None)
- Return type:
CoreFlow
Runtime control-ledger services.
- LedgerService
alias of
RuntimeControlLedgerService
- class RuntimeControlLedgerService(open_ledger_func=None, *, runtime_layout_policy=None)[source]
Bases:
objectOwn workspace-local runtime control-ledger access and client-session bookkeeping.
- Parameters:
open_ledger_func (Callable[[Path], RuntimeControlStore] | None)
runtime_layout_policy (RuntimeLayoutPolicy | None)
- open_for_workspace(workspace_root)[source]
Open the configured runtime control ledger for one workspace root.
- Parameters:
workspace_root (Path)
- Return type:
RuntimeControlStore
- close(ledger)[source]
Close one runtime control-ledger connection.
- Parameters:
ledger (RuntimeControlStore)
- Return type:
None
- register_client_session(ledger, *, client_id, workspace_id, client_kind, pid)[source]
Register or refresh one active local client session.
- Parameters:
ledger (RuntimeControlStore)
client_id (str)
workspace_id (str)
client_kind (str)
pid (int)
- Return type:
None
- remove_client_session(ledger, client_id)[source]
Remove one active local client session row.
- Parameters:
ledger (RuntimeControlStore)
client_id (str)
- Return type:
None
Operator log history services.
- class LogService[source]
Bases:
objectOwn operator log-store construction and log history queries.
- create_store(runtime_cache_ledger=None)[source]
Create one log store hydrated from the given runtime cache store.
- Parameters:
runtime_cache_ledger (RuntimeCacheStore | None)
- Return type:
FlowLogStore
- reload(store, runtime_cache_ledger)[source]
Reload one log store from an explicit runtime cache store.
- Parameters:
store (FlowLogStore)
runtime_cache_ledger (RuntimeCacheStore | None)
- Return type:
None
- append_entry(store, entry)[source]
Append one log entry to the current store.
- Parameters:
store (FlowLogStore)
entry (FlowLogEntry)
- Return type:
None
- clear_flow(store, flow_name)[source]
Clear one flow’s visible log history from the current store.
- Parameters:
store (FlowLogStore)
flow_name (str | None)
- Return type:
None
- all_entries(store)[source]
Return every entry currently held in the store.
- Parameters:
store (FlowLogStore)
- Return type:
tuple[FlowLogEntry, …]
Workspace runtime binding services for operator surfaces.
- class WorkspaceRuntimeBinding(workspace_paths, runtime_cache_ledger, runtime_control_ledger, log_store, daemon_manager)[source]
Bases:
objectConcrete runtime resources bound to one selected workspace.
- Parameters:
workspace_paths (WorkspacePaths)
runtime_cache_ledger (RuntimeCacheStore)
runtime_control_ledger (RuntimeControlStore)
log_store (FlowLogStore)
daemon_manager (WorkspaceDaemonManager)
- workspace_paths: WorkspacePaths
- runtime_cache_ledger: RuntimeCacheStore
- runtime_control_ledger: RuntimeControlStore
- log_store: FlowLogStore
- daemon_manager: WorkspaceDaemonManager
- class WorkspaceRuntimeBindingService(*, ledger_service, log_service, daemon_state_service, runtime_history_service)[source]
Bases:
objectOwn concrete runtime binding lifecycle for GUI/TUI surfaces.
- Parameters:
ledger_service (RuntimeControlLedgerService)
log_service (LogService)
daemon_state_service (DaemonStateService)
runtime_history_service (RuntimeHistoryService)
- open_binding(workspace_paths)[source]
Open one concrete runtime binding for a workspace selection.
- Parameters:
workspace_paths (WorkspacePaths)
- Return type:
- close_binding(binding)[source]
Close one concrete runtime binding.
- Parameters:
binding (WorkspaceRuntimeBinding)
- Return type:
None
- register_client_session(binding, *, client_id, client_kind, pid=None)[source]
Register or refresh one local client session for the binding workspace.
- Parameters:
binding (WorkspaceRuntimeBinding)
client_id (str)
client_kind (str)
pid (int | None)
- Return type:
None
- remove_client_session(binding, client_id)[source]
Remove one active local client session row.
- Parameters:
binding (WorkspaceRuntimeBinding)
client_id (str)
- Return type:
None
- purge_process_client_sessions(binding, *, client_kind, pid=None)[source]
Remove all client sessions for this workspace/client-kind/process tuple.
- Parameters:
binding (WorkspaceRuntimeBinding)
client_kind (str)
pid (int | None)
- Return type:
None
- count_live_client_sessions(binding, *, exclude_client_id=None)[source]
Return the number of live local client sessions for the binding workspace.
- Parameters:
binding (WorkspaceRuntimeBinding)
exclude_client_id (str | None)
- Return type:
int
- sync_runtime_state(binding, *, runtime_application, flow_cards, daemon_startup_in_progress=False)[source]
Return daemon/runtime sync state for one bound workspace.
- Parameters:
binding (WorkspaceRuntimeBinding)
runtime_application (RuntimeApplication)
daemon_startup_in_progress (bool)
- Return type:
object
- reload_logs(binding)[source]
Reload the binding log store from its runtime cache store.
- Parameters:
binding (WorkspaceRuntimeBinding)
- Return type:
None
- invalidate_flow_history(binding, *, flow_name)[source]
Drop one flow’s cached logs and derived step-output state after destructive resets.
- Parameters:
binding (WorkspaceRuntimeBinding)
flow_name (str)
- Return type:
None
- rebuild_step_outputs(binding, flow_cards)[source]
Rebuild latest successful per-step output paths for visible flows.
- Parameters:
binding (WorkspaceRuntimeBinding)
flow_cards (dict[str, FlowCatalogLike])
- Return type:
StepOutputIndex
- error_text_for_entry(binding, run_group, entry)[source]
Return one user-facing error title and persisted error text.
- Parameters:
binding (WorkspaceRuntimeBinding)
run_group (FlowRunState)
entry (FlowLogEntry)
- Return type:
tuple[str, str | None]
- recent_run_count(binding, *, days)[source]
Return the number of persisted runs started in the recent window.
- Parameters:
binding (WorkspaceRuntimeBinding)
days (int)
- Return type:
int
Runtime execution services for flow runs and grouped engine runs.
- class RuntimeExecutionService(*, flow_runtime_type=<class 'data_engine.runtime.execution.single.FlowRuntime'>, grouped_runtime_type=<class 'data_engine.runtime.execution.grouped.GroupedFlowRuntime'>, runtime_engine_type=<class 'data_engine.runtime.engine.RuntimeEngine'>, scheduler_host_factory=<class 'data_engine.hosts.scheduler.SchedulerHost'>, run_stop_controller=None)[source]
Bases:
objectOwn executable runtime construction for manual and grouped runs.
- Parameters:
flow_runtime_type (type[FlowRuntime])
grouped_runtime_type (type[GroupedFlowRuntime])
runtime_engine_type (type[RuntimeEngine])
scheduler_host_factory (Callable[..., SchedulerHost])
run_stop_controller (RuntimeStopController | None)
- run_once(flow, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None)[source]
Run one flow as a one-shot execution.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
- Return type:
object
- run_source(flow, source_path, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None)[source]
Run one flow for a specific source path.
- Parameters:
flow (CoreFlow)
source_path (str)
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
- Return type:
object
- run_batch(flow, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None)[source]
Run one flow once in batch mode.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
- Return type:
object
- preview(flow, *, use=None, runtime_ledger=None)[source]
Preview one flow through the one-shot runtime path.
- Parameters:
flow (CoreFlow)
use (str | None)
runtime_ledger (RuntimeCacheStore | None)
- Return type:
object
- run_manual(flow, *, runtime_ledger, runtime_stop_event, flow_stop_event=None)[source]
Run one flow as a manual one-shot execution.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore)
runtime_stop_event (Event)
flow_stop_event (Event | None)
- Return type:
object
- run_continuous(flow, *, runtime_ledger=None, flow_stop_event=None)[source]
Run one flow continuously.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore | None)
flow_stop_event (Event | None)
- Return type:
object
- run_grouped(flows, *, runtime_ledger, runtime_stop_event, flow_stop_event)[source]
Run grouped automated flows continuously.
- Parameters:
flows (tuple['CoreFlow', ...])
runtime_ledger (RuntimeCacheStore)
runtime_stop_event (Event)
flow_stop_event (Event)
- Return type:
object
- run_automated(flows, *, runtime_ledger=None, runtime_stop_event, flow_stop_event)[source]
Run automated poll and schedule flows through separate host timing surfaces.
- Parameters:
flows (tuple['CoreFlow', ...])
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event)
flow_stop_event (Event)
- Return type:
object
- run_grouped_continuous(flows, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None)[source]
Run grouped automated flows continuously with optional runtime controls.
- Parameters:
flows (tuple['CoreFlow', ...])
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
- Return type:
object
Runtime history query services.
- class RuntimeHistoryService[source]
Bases:
objectOwn persisted run/step history queries used by operator surfaces.
- rebuild_step_outputs(ledger, flow_cards)[source]
Rebuild latest successful per-step output paths for visible flows.
- Parameters:
ledger (RuntimeCacheStore)
flow_cards (dict[str, FlowCatalogLike])
- Return type:
- refresh_step_outputs(ledger, flow_cards, *, current_index, last_seen_step_run_id)[source]
Incrementally merge newly finished successful step outputs into the current index.
- Parameters:
ledger (RuntimeCacheStore)
flow_cards (dict[str, FlowCatalogLike])
current_index (StepOutputIndex)
last_seen_step_run_id (int | None)
- Return type:
- class StepOutputRefresh(last_step_run_id, index)[source]
Bases:
objectOne step-output refresh result with cache watermark.
- Parameters:
last_step_run_id (int | None)
index (StepOutputIndex)
- last_step_run_id: int | None
- index: StepOutputIndex
Machine-local settings services.
- class SettingsService(store)[source]
Bases:
objectOwn machine-local settings persistence for operator surfaces.
- Parameters:
store (LocalSettingsStore)
- classmethod default_store(*, app_root=None)[source]
Open the default local settings store for one app root.
- Parameters:
app_root (Path | None)
- Return type:
LocalSettingsStore
- classmethod open_default(*, app_root=None, store_factory=None)[source]
Open the default local settings store for the current app root.
- Parameters:
app_root (Path | None)
store_factory (Callable[[Path | None], LocalSettingsStore] | None)
- Return type:
- workspace_collection_root()[source]
Return the saved local workspace collection root override, when present.
- Return type:
Path | None
- set_workspace_collection_root(value)[source]
Persist the local workspace collection root override.
- Parameters:
value (Path | str | None)
- Return type:
None
- default_workspace_id()[source]
Return the saved default workspace id, when present.
- Return type:
str | None
- set_default_workspace_id(value)[source]
Persist the default workspace id.
- Parameters:
value (str | None)
- Return type:
None
Bases:
objectOwn lease-based shared snapshot hydration for operator surfaces.
Replace one local runtime ledger from the shared workspace snapshots.
- Parameters:
paths (WorkspacePaths)
ledger (RuntimeSnapshotStore)
- Return type:
None
Return current workspace lease metadata, if present.
- Parameters:
paths (WorkspacePaths)
- Return type:
dict[str, Any] | None
Return whether current workspace lease metadata is stale.
- Parameters:
paths (WorkspacePaths)
stale_after_seconds (float)
- Return type:
bool
Delete one flow’s shared snapshot history and freshness state.
- Parameters:
paths (WorkspacePaths)
flow_name (str)
- Return type:
None
Delete all shared coordination and snapshot state for one workspace.
- Parameters:
paths (WorkspacePaths)
- Return type:
None
Shared theme resolution services.
- class ThemeService(*, themes={'dark': ThemePalette(name='dark', window_bg='#0d1117', app_bg='#0d1117', panel_bg='#161b22', panel_border='#30363d', text='#c9d1d9', muted_text='#8b949e', section_text='#7d8590', accent_text='#2ea043', warning_text='#d29922', error_text='#f85149', button_bg='#21262d', button_hover='#30363d', button_checked_bg='#1f6feb', button_checked_border='#388bfd', button_disabled_bg='#161b22', button_disabled_border='#30363d', button_disabled_text='#6e7681', input_bg='#0d1117', input_border='#30363d', hover_bg='#1b2230', hover_border='#3b4556', selection_bg='#1f6feb', selection_text='#f0f6fc', selection_border='#388bfd', tab_bg='#161b22', tab_hover_bg='#1b2230', tab_selected_bg='#21262d', progress_bg='#0d1117', progress_chunk='#2ea043', summary_bg='#21262d', summary_border='#30363d', request_control_bg='#F04A00', request_control_border='#c23c00', request_control_hover='#d84300', engine_start_bg='#1f883d', engine_start_border='#1a7f37', engine_start_hover='#1a7f37', engine_stop_bg='#cf222e', engine_stop_border='#a40e26', engine_stop_hover='#a40e26'), 'light': ThemePalette(name = 'light', window_bg='#ffffff', app_bg='#f6f8fa', panel_bg='#ffffff', panel_border='#d0d7de', text='#1f2328', muted_text='#656d76', section_text='#57606a', accent_text='#1a7f37', warning_text='#9a6700', error_text='#cf222e', button_bg='#f6f8fa', button_hover='#eef2f6', button_checked_bg='#ddf4ff', button_checked_border='#54aeff', button_disabled_bg='#f6f8fa', button_disabled_border='#d8dee4', button_disabled_text='#8c959f', input_bg='#ffffff', input_border='#d0d7de', hover_bg='#f6f8fa', hover_border='#c7d2dd', selection_bg='#0969da', selection_text='#ffffff', selection_border='#54aeff', tab_bg='#f6f8fa', tab_hover_bg='#eef2f6', tab_selected_bg='#ffffff', progress_bg='#eef2f6', progress_chunk='#1a7f37', summary_bg='#f6f8fa', summary_border='#d0d7de', request_control_bg='#F04A00', request_control_border='#c23c00', request_control_hover='#d84300', engine_start_bg='#1f883d', engine_start_border='#1a7f37', engine_start_hover='#1a7f37', engine_stop_bg='#cf222e', engine_stop_border='#a40e26', engine_stop_hover='#a40e26')}, default_theme_name='system', resolve_theme_name_func=<function resolve_theme_name>, system_theme_name_func=<function system_theme_name>, toggle_theme_name_func=<function toggle_theme_name>, theme_button_text_func=<function theme_button_text>)[source]
Bases:
objectThin injectable wrapper around shared theme state decisions.
- Parameters:
themes (Mapping[str, ThemePalette])
default_theme_name (str)
resolve_theme_name_func (Callable[[str], str])
system_theme_name_func (Callable[[], str])
toggle_theme_name_func (Callable[[str], str])
theme_button_text_func (Callable[[str], str])
- resolve_name(theme_name='system')[source]
Resolve one explicit or system-bound theme name.
- Parameters:
theme_name (str)
- Return type:
str
- toggle_name(theme_name)[source]
Return the opposite theme name.
- Parameters:
theme_name (str)
- Return type:
str
Workspace provisioning helpers shared by CLI and GUI surfaces.
- class WorkspaceProvisioningResult(workspace_root, created_paths, preserved_paths)[source]
Bases:
objectDescribe which workspace assets were created during provisioning.
- Parameters:
workspace_root (Path)
created_paths (tuple[Path, ...])
preserved_paths (tuple[Path, ...])
- workspace_root: Path
- created_paths: tuple[Path, ...]
- preserved_paths: tuple[Path, ...]
- property created_anything: bool
Return whether provisioning created any new files or directories.
- class WorkspaceProvisioningService[source]
Bases:
objectOwn safe workspace-folder provisioning for operator surfaces.
- collection_vscode_settings(collection_root, *, app_root, interpreter_path=None)[source]
Return VS Code settings for one workspace collection root.
- Parameters:
collection_root (Path)
app_root (Path)
interpreter_path (Path | None)
- Return type:
dict[str, object]
- checkout_source_dir(app_root)[source]
Return the repo-local source directory when app_root points at a checkout.
- Parameters:
app_root (Path)
- Return type:
Path | None
- checkout_tests_dir(app_root)[source]
Return the repo-local tests directory when app_root points at a checkout.
- Parameters:
app_root (Path)
- Return type:
Path | None
- write_collection_vscode_settings(collection_root, *, app_root, interpreter_path=None, overwrite=False)[source]
Write collection-root VS Code settings unless an existing file should be preserved.
- Parameters:
collection_root (Path)
app_root (Path)
interpreter_path (Path | None)
overwrite (bool)
- Return type:
Path | None
- workspace_vscode_settings(workspace_root, *, app_root, interpreter_path=None)[source]
Return VS Code settings for one workspace root.
- Parameters:
workspace_root (Path)
app_root (Path)
interpreter_path (Path | None)
- Return type:
dict[str, object]
- write_workspace_vscode_settings(workspace_root, *, app_root, interpreter_path=None, overwrite=False)[source]
Write workspace-local VS Code settings unless an existing file should be preserved.
- Parameters:
workspace_root (Path)
app_root (Path)
interpreter_path (Path | None)
overwrite (bool)
- Return type:
Path | None
Workspace path and discovery services.
- class WorkspaceService(*, app_state_policy=None, discovery_policy=None, runtime_layout_policy=None, discover_workspaces_func=None, resolve_workspace_paths_func=None)[source]
Bases:
objectOwn workspace discovery and path resolution through explicit collaborators.
- Parameters:
app_state_policy (AppStatePolicy | None)
discovery_policy (WorkspaceDiscoveryPolicy | None)
runtime_layout_policy (RuntimeLayoutPolicy | None)
discover_workspaces_func (Callable[..., tuple[DiscoveredWorkspace, ...]] | None)
resolve_workspace_paths_func (Callable[..., WorkspacePaths] | None)
- discover(*, app_root=None, workspace_collection_root=None)[source]
Return discoverable workspaces for the current app and collection roots.
- Parameters:
app_root (Path | None)
workspace_collection_root (Path | None)
- Return type:
tuple[DiscoveredWorkspace, …]
- resolve_paths(*, workspace_id=None, workspace_root=None, data_root=None, workspace_collection_root=None)[source]
Resolve one workspace path set with the current override-aware rules.
- Parameters:
workspace_id (str | None)
workspace_root (Path | None)
data_root (Path | None)
workspace_collection_root (Path | None)
- Return type:
WorkspacePaths