API Reference

The package entrypoints most users will import are:

  • data_engine.Flow

  • data_engine.FlowContext

  • data_engine.discover_flows

  • data_engine.load_flow

  • data_engine.run

Flow Authoring

Flow DSL and public authoring entrypoints.

class Flow(group, name=None, label=None, trigger=None, mirror_spec=None, steps=(), _workspace_root=None)[source]

Bases: Flow

Public authoring flow with execution conveniences layered over core definitions.

Parameters:
  • group (str)

  • name (str | None)

  • label (str | None)

  • trigger (WatchSpec | None)

  • mirror_spec (MirrorSpec | None)

  • steps (tuple[StepSpec, ...])

  • _workspace_root (Path | None)

run_once(*, authoring_services=None, runtime_execution_service=None)[source]

Run this flow once and return completed runtime contexts.

Parameters:
  • authoring_services (AuthoringServices | None) – Optional service bundle used by tests or embedded hosts.

  • runtime_execution_service (RuntimeExecutionService | None) – Optional runtime execution service override.

Returns:

One context per executed source.

Return type:

list[FlowContext]

preview(*, use=None, authoring_services=None, runtime_execution_service=None)[source]

Run this flow in preview mode and return one preview value.

Parameters:
  • use (str | None) – Optional named object slot to preview instead of the final current value.

  • authoring_services (AuthoringServices | None) – Optional service bundle used by tests or embedded hosts.

  • runtime_execution_service (RuntimeExecutionService | None) – Optional runtime execution service override.

Returns:

Preview value returned by the runtime execution service.

Return type:

object

Raises:

FlowValidationError – If preview is requested from inside a compiled flow module.

run(*, authoring_services=None, runtime_execution_service=None)[source]

Run this flow continuously according to its trigger.

Parameters:
  • authoring_services (AuthoringServices | None) – Optional service bundle used by tests or embedded hosts.

  • runtime_execution_service (RuntimeExecutionService | None) – Optional runtime execution service override.

Returns:

Completed contexts collected before the runtime exits.

Return type:

list[FlowContext]

discover_flows(*, data_root=None, authoring_services=None, flow_execution_service=None)[source]

Discover and build all code-defined flows from compiled flow modules.

Parameters:
  • data_root (Path | None)

  • authoring_services (AuthoringServices | None)

  • flow_execution_service (FlowExecutionService | None)

Return type:

tuple[Flow, …]

load_flow(name, *, data_root=None, authoring_services=None, flow_execution_service=None)[source]

Load one code-defined flow by flow-module name.

Parameters:
  • name (str)

  • data_root (Path | None)

  • authoring_services (AuthoringServices | None)

  • flow_execution_service (FlowExecutionService | None)

Return type:

Flow

run(*flows, authoring_services=None, runtime_execution_service=None)[source]

Run multiple flows with sequential execution per group and parallel groups.

Parameters:
Return type:

list[FlowContext]

Core Flow Model

Pure flow definitions and fluent builder methods.

class Flow(group, name=None, label=None, trigger=None, mirror_spec=None, steps=(), _workspace_root=None)[source]

Bases: object

Immutable fluent builder for generic runtime flows.

Flow is the authoring object most flow modules return. Builder methods never mutate the existing instance; each method returns a new flow so chained definitions stay predictable and easy to review.

Examples

from data_engine import Flow

flow = (
    Flow(group="Claims")
    .watch(mode="poll", source="incoming", interval="30s", extensions=[".xlsx"])
    .mirror(root="processed")
)

assert flow.mode == "poll"
Variables:
  • group (str) – Display group used by the GUI, TUI, and runtime summaries.

  • name (str | None) – Stable flow identifier. When omitted in a flow module, the module loader can derive it from the module name.

  • label (str | None) – Optional human-readable display label.

  • trigger (WatchSpec | None) – Runtime trigger configuration.

  • mirror_spec (MirrorSpec | None) – Mirrored output path configuration.

  • steps (tuple[StepSpec, ...]) – Ordered callable steps to run.

Parameters:
  • group (str)

  • name (str | None)

  • label (str | None)

  • trigger (WatchSpec | None)

  • mirror_spec (MirrorSpec | None)

  • steps (tuple[StepSpec, ...])

  • _workspace_root (Path | None)

Notes

Builder methods return a new Flow instance, so calls can be chained.

group: str
name: str | None = None
label: str | None = None
trigger: WatchSpec | None = None
mirror_spec: MirrorSpec | None = None
steps: tuple[StepSpec, ...] = ()
watch(*, mode, run_as='individual', max_parallel=1, source=None, interval=None, time=None, extensions=None, settle=1)[source]

Configure how this flow is triggered.

Use manual for explicit operator-driven runs, poll when source file changes should trigger work, and schedule when time should trigger work. run_as="individual" runs once per concrete source file; run_as="batch" runs once for the watched source root.

Examples

from data_engine import Flow

manual_flow = Flow(group="Claims").watch(mode="manual")
poll_flow = Flow(group="Claims").watch(mode="poll", source="incoming", interval="5s")
schedule_flow = Flow(group="Claims").watch(mode="schedule", interval="15m")

assert manual_flow.mode == "manual"
assert poll_flow.mode == "poll"
assert schedule_flow.mode == "schedule"
Parameters:
  • mode (str) – Trigger mode: "manual", "poll", or "schedule".

  • run_as (str) – "individual" to run once per source file, or "batch" to run once for the full source set.

  • max_parallel (int) – Maximum number of concurrent source-file runs for one flow when run_as="individual". Defaults to 1.

  • source (str | Path | None) – File or directory watched by poll/schedule triggers.

  • interval (str | None) – Duration string such as "10s" or "5m" for poll intervals or recurring schedules.

  • time (str | tuple[str, ...] | list[str] | set[str] | None) – One or more HH:MM daily schedule times.

  • extensions (tuple[str, ...] | list[str] | set[str] | None) – Optional file extensions used when discovering source files.

  • settle (int) – Polling settle window in seconds before a changed file is queued.

Returns:

A new flow with the trigger configuration attached.

Return type:

Flow

Raises:

FlowValidationError – If the trigger mode, source, schedule, or polling options are inconsistent.

mirror(*, root)[source]

Bind a mirrored output namespace rooted at one directory.

mirror only configures paths. It does not write files by itself; the runtime later exposes write-ready paths through context.mirror.

Examples

from data_engine import Flow

flow = Flow(group="Claims").mirror(root="processed")

assert str(flow.mirror_spec.root).endswith("processed")
Parameters:

root (str | Path) – Directory used by context.mirror helpers when writing outputs.

Returns:

A new flow with mirror output helpers enabled.

Return type:

Flow

step(fn, *, use=None, save_as=None, label=None)[source]

Append one callable step to the flow.

step is the default workhorse for flow authoring. The callable receives one FlowContext and its return value becomes the next context.current. Use save_as to keep an intermediate result under context.objects and use to load a saved object before a later step runs.

Examples

from data_engine import Flow

def read_claims(context):
    return context.current

flow = Flow(group="Claims").step(read_claims, save_as="raw_df")

assert flow.steps[0].save_as == "raw_df"
Parameters:
  • fn (Callable[[FlowContext], object]) – Callable that accepts a single FlowContext and returns the next value for context.current.

  • use (str | None) – Optional named object slot to load into context.current before the step runs.

  • save_as (str | None) – Optional named object slot to store the step result in.

  • label (str | None) – Optional display label for logs and UI step summaries.

Returns:

A new flow with the step appended.

Return type:

Flow

Raises:

FlowValidationError – If fn is not callable or does not accept exactly one argument.

map(fn, *, use=None, save_as=None, label=None)[source]

Append a step that maps a callable over the current iterable value.

map is best when the same callable should run once per collected item. The callable may accept either item or context, item and the mapped values are returned as a Batch.

Examples

from data_engine import Flow

def summarize(file_ref):
    return file_ref.name

flow = Flow(group="Claims").collect(extensions=[".xlsx"]).map(fn=summarize)

assert len(flow.steps) == 2
Parameters:
  • fn (Callable[..., object]) – Callable accepting either item or context, item.

  • use (str | None) – Optional named object slot to map instead of the current value.

  • save_as (str | None) – Optional named object slot to store the mapped Batch result in.

  • label (str | None) – Optional display label for logs and UI step summaries.

Returns:

A new flow with the mapping step appended.

Return type:

Flow

Raises:

FlowValidationError – If fn is not callable, has an unsupported signature, or the mapped current value is not iterable.

collect(extensions, *, root=None, recursive=False, use=None, save_as=None, label=None)[source]

Append a step that collects source files into a Batch.

If root is omitted, collection uses the active source root from the runtime context. This keeps scheduled or poll-driven batch flows concise because the watched source directory can also be the collection root.

Examples

from data_engine import Flow

flow = Flow(group="Docs").collect(extensions=[".pdf"], recursive=True)

assert flow.steps[0].label == "Collect Files"
Parameters:
  • extensions (tuple[str, ...] | list[str] | set[str]) – File extensions to include, such as (".xlsx", ".csv").

  • root (str | Path | None) – Optional search root. Defaults to the active source root.

  • recursive (bool) – Whether to search child directories.

  • use (str | None) – Optional named object slot to load before collecting.

  • save_as (str | None) – Optional named object slot to store the collected batch in.

  • label (str | None) – Optional display label for logs and UI step summaries.

Returns:

A new flow with the collection step appended.

Return type:

Flow

step_each(fn, *, use=None, save_as=None, label=None)[source]

Alias for map that reads naturally in step chains.

Parameters:
  • fn (Callable[..., object]) – Callable accepting either item or context, item.

  • use (str | None) – Optional named object slot to map instead of the current value.

  • save_as (str | None) – Optional named object slot to store the mapped Batch result in.

  • label (str | None) – Optional display label for logs and UI step summaries.

Returns:

A new flow with the per-item step appended.

Return type:

Flow

property mode: str

Return the trigger mode or manual for unconfigured flows.

Core Primitives

Core flow specs, contexts, and small containers.

class Batch(items)[source]

Bases: Generic[T]

Small iterable runtime container used instead of exposing raw lists by default.

Parameters:

items (tuple[T, ...])

items: tuple[T, ...]
names()[source]

Return each item name when all items expose a string name.

Return type:

tuple[str, …]

paths()[source]

Return each item path when all items expose a Path-valued path.

Return type:

tuple[Path, …]

class FileRef(path)[source]

Bases: object

Thin runtime wrapper for one filesystem path in a batch-oriented flow.

Parameters:

path (Path)

path: Path
property name: str

Return the file name including extension.

property stem: str

Return the file name without extension.

property suffix: str

Return the file extension.

property parent: Path

Return the parent directory.

exists()[source]

Return whether the referenced path currently exists.

Return type:

bool

class FlowContext(flow_name, group, source=None, mirror=None, current=None, objects=<factory>, metadata=<factory>, config=<factory>)[source]

Bases: object

Mutable runtime state shared across steps during one flow execution.

Steps receive a FlowContext object. current is the active value, objects stores named intermediate values created with save_as, metadata holds runtime annotations, and source/mirror expose source and output path helpers when the flow configuration provides them.

Variables:
  • flow_name (str) – Stable flow name for the current execution.

  • group (str) – Flow group used by operator surfaces.

  • source (SourceContext | None) – Source path helper for source-backed executions.

  • mirror (MirrorContext | None) – Write-ready mirrored output helper when the flow configured a mirror.

  • current (object | None) – Active value passed between steps.

  • objects (dict[str, object]) – Named intermediate values saved by save_as.

  • metadata (dict[str, object]) – Runtime metadata attached to the execution.

  • config (WorkspaceConfigContext) – Lazy workspace config reader.

Parameters:

Examples

from data_engine.core.primitives import FlowContext

context = FlowContext(flow_name="claims", group="Claims", current=1)
context.objects["raw"] = context.current

assert context.current == 1
assert context.objects["raw"] == 1
flow_name: str
group: str
source: SourceContext | None = None
mirror: MirrorContext | None = None
current: object | None = None
objects: dict[str, object]
metadata: dict[str, object]
config: WorkspaceConfigContext
source_metadata()[source]

Return filesystem metadata for the current source file when available.

Return type:

SourceMetadata | None

database(name)[source]

Return a write-ready path beneath the workspace databases directory.

Use this for workspace-owned DuckDB files and other durable database artifacts. The returned path is rooted under <workspace>/databases/ and parent directories are created for you.

Parameters:

name (str | Path) – Relative database file name, such as "analytics.duckdb" or "claims/analytics.duckdb".

Returns:

Absolute write-ready database path.

Return type:

Path

Raises:

FlowValidationError – If the flow is not running from an authored workspace, or if name is absolute or empty.

class MirrorContext(root, source_path=None, relative_path=None)[source]

Bases: object

Write-ready mirrored output namespace for one runtime source.

context.mirror is available when a flow was configured with Flow.mirror(root=...). The helpers return paths and create parent directories as needed, but they do not write file contents.

Parameters:
  • root (Path)

  • source_path (Path | None)

  • relative_path (Path | None)

root: Path
source_path: Path | None = None
relative_path: Path | None = None
property dir: Path

Return a write-ready namespace directory for derived files.

property folder: Path

Return the mirrored parent folder for the current source file.

with_suffix(suffix)[source]

Return the canonical mirrored source path with a replaced suffix.

Parameters:

suffix (str)

Return type:

Path

with_extension(suffix)[source]

Return the canonical mirrored source path with a replaced extension.

Parameters:

suffix (str)

Return type:

Path

file(name)[source]

Return a write-ready file path in the mirrored source folder.

Parameters:

name (str | Path)

Return type:

Path

namespaced_file(name)[source]

Return a write-ready derived file path inside the mirrored source namespace.

Parameters:

name (str | Path)

Return type:

Path

root_file(name)[source]

Return a write-ready file path directly beneath the mirror root.

Parameters:

name (str | Path)

Return type:

Path

class MirrorSpec(root)[source]

Bases: object

Static flow-level mirror binding.

Parameters:

root (Path)

root: Path
class SourceContext(root, path=None, relative_path=None)[source]

Bases: object

Resolved source namespace for one runtime source.

context.source points at the watched source root and, for individual file runs, the concrete source file. Its helpers are read-oriented path conveniences; unlike MirrorContext they do not create directories.

Parameters:
  • root (Path)

  • path (Path | None)

  • relative_path (Path | None)

root: Path
path: Path | None = None
relative_path: Path | None = None
property dir: Path

Return the namespace directory for files derived from the active source.

property folder: Path

Return the parent folder for the active source file.

with_suffix(suffix)[source]

Return the source path with a replaced suffix.

Parameters:

suffix (str)

Return type:

Path

with_extension(suffix)[source]

Return the source path with a replaced extension.

Parameters:

suffix (str)

Return type:

Path

file(name)[source]

Return a derived file path in the active source folder.

Parameters:

name (str | Path)

Return type:

Path

namespaced_file(name)[source]

Return a derived file path inside the active source namespace.

Parameters:

name (str | Path)

Return type:

Path

root_file(name)[source]

Return a file path directly beneath the source root.

Parameters:

name (str | Path)

Return type:

Path

class SourceMetadata(path, name, size_bytes, modified_at_utc)[source]

Bases: object

Resolved filesystem metadata for the current source file.

Parameters:
  • path (Path)

  • name (str)

  • size_bytes (int)

  • modified_at_utc (datetime)

path: Path
name: str
size_bytes: int
modified_at_utc: datetime
class StepSpec(fn, use, save_as, label, function_name)[source]

Bases: object

One generic callable step in a flow.

Parameters:
  • fn (Callable[[...], object])

  • use (str | None)

  • save_as (str | None)

  • label (str)

  • function_name (str)

fn: Callable[[...], object]
use: str | None
save_as: str | None
label: str
function_name: str
class WatchSpec(mode, run_as, max_parallel=1, source=None, interval=None, interval_seconds=None, time=None, times=(), time_slots=(), extensions=None, settle=1)[source]

Bases: object

Normalized runtime watch configuration.

Parameters:
  • mode (str)

  • run_as (str)

  • max_parallel (int)

  • source (Path | None)

  • interval (str | None)

  • interval_seconds (float | None)

  • time (str | tuple[str, ...] | None)

  • times (tuple[str, ...])

  • time_slots (tuple[tuple[int, int], ...])

  • extensions (tuple[str, ...] | None)

  • settle (int)

mode: str
run_as: str
max_parallel: int = 1
source: Path | None = None
interval: str | None = None
interval_seconds: float | None = None
time: str | tuple[str, ...] | None = None
times: tuple[str, ...] = ()
time_slots: tuple[tuple[int, int], ...] = ()
extensions: tuple[str, ...] | None = None
settle: int = 1
class WorkspaceConfigContext(workspace_root=None, _cache=<factory>, _names=None)[source]

Bases: object

Lazy read-only access to workspace-local TOML config files.

context.config reads files from <workspace>/config/*.toml on demand. It returns dictionaries so flows can keep environment-specific settings out of Python modules without introducing a larger configuration framework.

Variables:

workspace_root (Path | None) – Authored workspace root. When omitted, config lookup is unavailable and returns no names.

Parameters:
  • workspace_root (Path | None)

  • _cache (dict[str, dict[str, object]])

  • _names (tuple[str, ...] | None)

Examples

from data_engine.core.primitives import WorkspaceConfigContext

config = WorkspaceConfigContext()

assert config.names() == ()
workspace_root: Path | None = None
property config_dir: Path | None

Return the conventional config directory for the authored workspace.

names()[source]

Return available config file stems beneath config/.

Return type:

tuple[str, …]

get(name)[source]

Return one parsed config mapping when available.

Parameters:

name (str)

Return type:

dict[str, object] | None

require(name)[source]

Return one parsed config mapping or fail loudly when missing.

Parameters:

name (str)

Return type:

dict[str, object]

all()[source]

Return all parsed config mappings keyed by file stem.

Return type:

dict[str, dict[str, object]]

collect_files(extensions, *, root=None, recursive=False)[source]

Return a step callable that collects matching files into a Batch of FileRef items.

Parameters:
  • extensions (tuple[str, ...] | list[str] | set[str])

  • root (str | Path | None)

  • recursive (bool)

Return type:

Callable[[FlowContext], Batch[FileRef]]

Core Runtime Models

Core errors for Data Engine flow definitions and execution.

exception FlowExecutionError(*, flow_name, phase, detail, step_label=None, function_name=None, source_path=None)[source]

Bases: FlowValidationError

Raised when a flow module fails during import, build, or runtime execution.

Parameters:
  • flow_name (str)

  • phase (str)

  • detail (str)

  • step_label (str | None)

  • function_name (str | None)

  • source_path (Path | str | None)

Return type:

None

exception FlowStoppedError[source]

Bases: RuntimeError

Raised when a running flow is stopped by an external control.

exception FlowValidationError[source]

Bases: ValueError

Raised when a flow configuration or runtime input cannot be validated.

Runtime Engine

Command-shaped runtime engine for executing core flows.

class RuntimeEngine(*, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None, status_callback=None, flow_runtime_type=<class 'data_engine.runtime.execution.single.FlowRuntime'>, grouped_runtime_type=<class 'data_engine.runtime.execution.grouped.GroupedFlowRuntime'>, run_stop_controller=None)[source]

Bases: object

Execute flows through explicit runtime commands.

The engine does not know about GUI, TUI, CLI, local settings, or daemon wiring. Hosts pass state/event collaborators in explicitly; the current implementation adapts the existing sequential and grouped runtimes while giving higher layers a command-shaped seam to target.

Parameters:
  • runtime_ledger (RuntimeCacheLedger | None)

  • runtime_stop_event (Event | None)

  • flow_stop_event (Event | None)

  • status_callback (Callable[[str], None] | None)

  • flow_runtime_type (type[FlowRuntime])

  • grouped_runtime_type (type[GroupedFlowRuntime])

  • run_stop_controller (RuntimeStopController | None)

run_once(flow)[source]

Run one flow once using its configured startup sources.

Parameters:

flow (CoreFlow)

Return type:

list[FlowContext]

run_source(flow, source_path)[source]

Run one flow for a specific source path.

Parameters:
  • flow (CoreFlow)

  • source_path (str | Path)

Return type:

FlowContext

run_batch(flow)[source]

Run one flow once in batch mode using the configured source root.

Parameters:

flow (CoreFlow)

Return type:

FlowContext

preview(flow, *, use=None)[source]

Preview one flow through the one-shot runtime path.

Parameters:
  • flow (CoreFlow)

  • use (str | None)

Return type:

object

run_continuous(flow)[source]

Run one flow continuously according to its trigger.

Parameters:

flow (CoreFlow)

Return type:

list[FlowContext]

run_grouped(flows, *, continuous=True)[source]

Run flows grouped by flow group with sequential execution inside each group.

Parameters:
  • flows (tuple['CoreFlow', ...])

  • continuous (bool)

Return type:

list[FlowContext]

stop(run_id)[source]

Request that the active runtime stop a run by id.

Parameters:

run_id (str)

Return type:

None

Runtime execution internals for authored flows.

class FlowRuntime(flows, *, continuous, runtime_stop_event=None, flow_stop_event=None, status_callback=None, runtime_ledger=None, runtime_ledger_service=None, runtime_ledger_factory=None, run_stop_controller=None)[source]

Bases: object

Sequential runtime that executes one or more configured flows.

Parameters:
  • flows (tuple['CoreFlow', ...])

  • continuous (bool)

  • runtime_stop_event (threading.Event | None)

  • flow_stop_event (threading.Event | None)

  • status_callback (Callable[[str], None] | None)

  • runtime_ledger (RuntimeCacheLedger | None)

  • runtime_ledger_service (RuntimeCacheLedgerService | None)

  • runtime_ledger_factory (Callable[[], RuntimeCacheLedger] | None)

  • run_stop_controller (RuntimeStopController | None)

run()[source]
Return type:

list[FlowContext]

preview(*, use=None)[source]

Run exactly one flow for notebook-style inspection and return one object.

Parameters:

use (str | None)

run_source(flow, source_path)[source]

Run one flow for a specific source path.

Parameters:
  • flow (CoreFlow)

  • source_path (str | Path)

Return type:

FlowContext

run_batch(flow)[source]

Run one flow once in batch mode using the configured source root.

Parameters:

flow (CoreFlow)

Return type:

FlowContext

max_parallel_for_flow(flow)[source]

Return the allowed per-flow source concurrency for one flow.

Parameters:

flow (CoreFlow)

Return type:

int

dispatch_queued_jobs(queue, queued_keys, pending_futures, executor, *, results)[source]

Submit queued source jobs up to each flow’s bounded concurrency and drain completions.

Parameters:
  • queued_keys (set[tuple[str, str | None]])

  • pending_futures (dict[Future[FlowContext], tuple[QueuedRunJob, int]])

  • executor (ThreadPoolExecutor)

  • results (list[FlowContext])

Return type:

None

wait_for_dispatched_jobs(pending_futures, *, results)[source]

Wait for all pending queued jobs to complete.

Parameters:
Return type:

None

register_run(run_id)[source]

Mark one run id as active.

Parameters:

run_id (str)

Return type:

None

unregister_run(run_id)[source]

Clear active and requested state for one completed run id.

Parameters:

run_id (str)

Return type:

None

check_run(run_id)[source]

Raise when runtime-wide or run-id stop has been requested.

Parameters:

run_id (str | None)

Return type:

None

class GroupedFlowRuntime(flows, *, continuous, runtime_stop_event=None, flow_stop_event=None, status_callback=None, runtime_ledger=None, runtime_ledger_service=None, runtime_ledger_factory=None, run_stop_controller=None)[source]

Bases: object

Grouped orchestrator: sequential within a group, parallel across groups.

Parameters:
  • flows (tuple['Flow', ...])

  • continuous (bool)

  • runtime_stop_event (threading.Event | None)

  • flow_stop_event (threading.Event | None)

  • status_callback (Callable[[str], None] | None)

  • runtime_ledger (RuntimeCacheLedger | None)

  • runtime_ledger_service (RuntimeCacheLedgerService | None)

  • runtime_ledger_factory (Callable[[], RuntimeCacheLedger] | None)

  • run_stop_controller (RuntimeStopController | None)

run()[source]
Return type:

list[FlowContext]

Run-id-aware runtime stop control.

class RuntimeStopController[source]

Bases: object

Track stop requests for specific active runtime run ids.

request_stop(run_id)[source]

Request that one active or future run id stop.

Parameters:

run_id (str)

Return type:

None

register_run(run_id)[source]

Mark one run id as active.

Parameters:

run_id (str)

Return type:

None

unregister_run(run_id)[source]

Clear active and requested state for one completed run id.

Parameters:

run_id (str)

Return type:

None

check_run(run_id)[source]

Raise when stop has been requested for run_id.

Parameters:

run_id (str | None)

Return type:

None

active_run_ids()[source]

Return active run ids in stable order.

Return type:

tuple[str, …]

File Watching

Filesystem discovery and polling services used by the flow runtime.

class IFileWatcher(*args, **kwargs)[source]

Bases: Protocol

Common interface for runtime file watchers.

start()[source]

Begin watching for filesystem changes.

Return type:

None

stop()[source]

Stop watching for filesystem changes.

Return type:

None

drain_events()[source]

Return any queued filesystem events observed since the last drain.

Return type:

list[Path]

class PollingWatcher(input_root, *, recursive=True, extensions=None, settle=1)[source]

Bases: object

Filesystem polling watcher for one file or directory root.

Parameters:
  • input_root (Path)

  • recursive (bool)

  • extensions (tuple[str, ...] | None)

  • settle (int)

start()[source]

Capture an initial filesystem snapshot and begin watching.

Return type:

None

stop()[source]

Stop watching for new filesystem events.

Return type:

None

drain_events()[source]

Return newly stable files observed since the last poll.

Return type:

list[Path]

iter_candidate_paths(input_root, *, extensions=None, recursive=True, allow_missing=False)[source]

Yield candidate files from one root using optional extension filters.

Parameters:
  • input_root (Path)

  • extensions (tuple[str, ...] | None)

  • recursive (bool)

  • allow_missing (bool)

Return type:

Iterable[Path]

is_temporary_file_path(path)[source]

Return whether a path looks like a transient temp or download file.

Parameters:

path (Path)

Return type:

bool

Authoring Helpers

Small schema helpers for Polars-oriented flow authoring.

class ColumnCasts[source]

Bases: dict[str, object]

Dict-like dtype mapping with a Polars apply helper.

TableSchema.dtypes returns this type. Values are passed to polars.Expr.cast so callers can use normal Polars dtype objects such as pl.String, pl.Int64, and pl.Datetime. apply casts remaining frame columns to pl.String.

apply(df)[source]

Cast columns on a Polars frame.

Parameters:

df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to transform.

Returns:

Frame with configured dtype casts applied and unspecified columns cast to pl.String.

Return type:

pl.DataFrame | pl.LazyFrame

class ColumnSelection(columns)[source]

Bases: tuple[str, …]

Tuple-like column projection with Polars convenience methods.

TableSchema.columns returns this type so schema definitions can be used directly in dataframe chains while still behaving like a normal tuple.

Examples

import polars as pl

from data_engine.helpers import TableSchema

schema = TableSchema(columns=("Claim Id",), dtypes={"Claim Id": pl.Int64})
df = pl.DataFrame({"Claim Id": [1], "SSN": ["123"]})

assert schema.columns.apply(df).columns == ["Claim Id"]
Parameters:

columns (Iterable[str])

Return type:

ColumnSelection

apply(df)[source]

Select these columns from a Polars frame.

Parameters:

df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to transform.

Returns:

Frame containing only these columns.

Return type:

pl.DataFrame | pl.LazyFrame

normalize_column_names(df)[source]

Normalize this selection’s column names on a Polars frame.

Parameters:

df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.

Returns:

Frame with matching selected column names normalized.

Return type:

pl.DataFrame | pl.LazyFrame

class DropColumns(columns)[source]

Bases: tuple[str, …]

Tuple-like drop list with a Polars apply helper.

TableSchema.drop returns this type. Empty drop lists are no-ops, which keeps chained cleanup code simple.

Parameters:

columns (Iterable[str])

Return type:

DropColumns

apply(df)[source]

Drop these columns from a Polars frame.

Parameters:

df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to transform.

Returns:

Frame without these columns.

Return type:

pl.DataFrame | pl.LazyFrame

class RenameColumns[source]

Bases: dict[str, str]

Dict-like rename mapping with a Polars apply helper.

TableSchema.rename returns this type. Empty mappings are no-ops, so the same cleanup chain can be used whether a schema currently renames columns or not.

apply(df)[source]

Rename columns on a Polars frame.

Parameters:

df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to transform.

Returns:

Frame with configured columns renamed.

Return type:

pl.DataFrame | pl.LazyFrame

class TableSchema(columns=(), dtypes=(), rename=(), drop=())[source]

Bases: object

Column cleanup helper for compact Polars dataframe chains.

TableSchema is intentionally small: it stores an explicit column projection, a source-column dtype map, a rename map, and a drop list. Each attribute exposes an apply method so flow code can decide the cleanup order explicitly instead of relying on a magical all-in-one schema operation.

Variables:
  • columns (Iterable[str] | ColumnSelection) – Explicit projection columns. Use schema.columns.apply(df) wherever that projection belongs in your chain.

  • dtypes (ColumnDtypes | ColumnCasts) – Source column names mapped to Polars dtype objects. Use schema.dtypes.apply(df) to cast them. Remaining frame columns are cast to pl.String.

  • rename (ColumnRenames) – Source-to-target column names. Use schema.rename.apply(df) to rename them.

  • drop (Iterable[str]) – Source columns to remove. Use schema.drop.apply(df) to drop them.

Parameters:
  • columns (Iterable[str] | ColumnSelection)

  • dtypes (Mapping[str, object] | Iterable[tuple[str, object]] | ColumnCasts)

  • rename (Mapping[str, str] | Iterable[tuple[str, str]])

  • drop (Iterable[str])

Notes

columns is an explicit projection applied at the point you call schema.columns.apply(df). For example, you might cast all incoming columns, drop private fields before persistence, write to DuckDB, and then select the columns to return.

Examples

import polars as pl

from data_engine.helpers import TableSchema

schema = TableSchema(
    columns=("step", "time", "workflow"),
    dtypes={"step_to": pl.String, "time": pl.Time},
    rename={"step_to": "step", "workflow_to": "workflow"},
    drop=("workflow_from", "ssn"),
)

df = pl.DataFrame(
    {
        "step_to": ["review"],
        "time": ["09:30:00"],
        "workflow_to": ["claims"],
        "workflow_from": ["intake"],
        "ssn": ["000-00-0000"],
    }
).with_columns(pl.col("time").str.to_time())

df = schema.dtypes.apply(df)
df = schema.drop.apply(df)
df = schema.rename.apply(df)
df = schema.columns.apply(df)

assert df.columns == ["step", "time", "workflow"]
assert df.schema["workflow"] == pl.String

Normalize all incoming names first when source files use inconsistent spacing or capitalization:

df = pl.DataFrame({"Workflow\tTo": ["claims"]})
df = schema.normalize_column_names(df)

assert df.columns == ["workflow_to"]
columns: Iterable[str] | ColumnSelection = ()
dtypes: Mapping[str, object] | Iterable[tuple[str, object]] | ColumnCasts = ()
rename: Mapping[str, str] | Iterable[tuple[str, str]] = ()
drop: Iterable[str] = ()
normalize_column_names(df)[source]

Normalize all column names on a Polars frame.

Parameters:

df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.

Returns:

Frame with normalized column names.

Return type:

pl.DataFrame | pl.LazyFrame

normalize_column_name(name)[source]

Return a normalized column name.

Parameters:

name (object) – Source column name to normalize.

Returns:

Lowercase column name with separator-adjacent spaces removed, remaining whitespace collapsed, and spaces replaced with underscores.

Return type:

str

normalize_column_names(df, columns=None)[source]

Normalize column names on a Polars frame.

Parameters:
  • df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.

  • columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all frame columns are normalized.

Returns:

Frame with normalized column names.

Return type:

pl.DataFrame | pl.LazyFrame

normalized_column_renames(columns)[source]

Return a Polars rename mapping for normalized column names.

Parameters:

columns (Iterable[object]) – Source column names to normalize.

Returns:

Mapping from original column names to normalized names, excluding names that are already normalized.

Return type:

dict[str, str]

Polars namespace helpers for Data Engine flow authoring.

networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]

Return Excel-style business-day counts as a Polars expression.

This helper matches Excel NETWORKDAYS semantics by counting both endpoints when they are business days. Weekends default to Saturday/Sunday, and optional holidays are excluded from the count.

The one intentional extension is count_first_day. When enabled, the start date is still counted even if it falls on a masked weekday or one of the supplied holidays.

Parameters:
  • start (pl.Expr | str | date | datetime) – Start date expression, column name, or scalar date/datetime.

  • end (pl.Expr | str | date | datetime) – End date expression, column name, or scalar date/datetime.

  • holidays (list[date | datetime | str] | tuple[...] | set[...] | None) – Optional holiday dates removed from the business-day count. String values must use ISO date text such as "2026-04-15".

  • count_first_day (bool) – Whether to force the first day into the count when it would normally be excluded by the weekday mask or holiday list.

  • mask (Iterable[bool] | None) – Monday-first seven-item business-day mask. Every item must be a real bool. None uses the Excel default: Monday-Friday counted, Saturday-Sunday excluded.

Returns:

Expression that evaluates to the signed business-day count. Datetime inputs are normalized to their calendar date before counting.

Return type:

pl.Expr

Examples

Add a row-level business-day count:

from datetime import date
import polars as pl

import data_engine.helpers

df = pl.DataFrame(
    {
        "received_date": [date(2026, 4, 13), date(2026, 4, 14)],
        "due_date": [date(2026, 4, 17), date(2026, 4, 21)],
    }
).with_columns(
    business_days=data_engine.helpers.networkdays(
        "received_date",
        "due_date",
        holidays=[date(2026, 4, 15)],
    )
)

Use scalar datetimes and count the first day:

from datetime import datetime

df = df.with_columns(
    sla_days=data_engine.helpers.networkdays(
        datetime(2026, 4, 13, 8, 30),
        pl.col("resolved_at"),
        count_first_day=True,
    )
)

Chain the expression into a grouped cumulative total:

df = (
    df.sort(["claim_id", "sequence_number"])
    .with_columns(
        cumulative_business_days=
        pl.when(pl.col("use_days"))
        .then(
            data_engine.helpers.networkdays(
                "start_date",
                "end_date",
                holidays=[date(2026, 4, 15)],
            )
        )
        .otherwise(pl.lit(0))
        .cum_sum()
        .over("claim_id")
    )
)

Notes

networkdays(...) returns a normal pl.Expr. You can chain it into cum_sum(), window expressions, filters, and any other Polars expression pipeline.

workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]

Return Excel-style workday offsets as a Polars expression.

This helper mirrors Excel WORKDAY by returning the business date that falls the requested number of working days before or after start.

The one intentional extension is count_first_day. When enabled, the start date itself can be day 1, even if it falls on a masked weekday or one of the supplied holidays.

Parameters:
  • start (pl.Expr | str | date | datetime) – Start date expression, column name, or scalar date/datetime.

  • days (pl.Expr | str | int) – Signed business-day offset expression, column name, or scalar integer.

  • holidays (list[date | datetime | str] | tuple[...] | set[...] | None) – Optional holiday dates skipped while calculating the result. String values must use ISO date text such as "2026-04-15".

  • count_first_day (bool) – Whether the start date itself can count as day 1 when moving forward or backward through business days.

  • mask (Iterable[bool] | None) – Monday-first seven-item business-day mask. Every item must be a real bool. None uses the Excel default: Monday-Friday counted, Saturday-Sunday excluded.

Returns:

Expression that evaluates to a Date result. Datetime inputs are normalized to their calendar date before offsetting.

Return type:

pl.Expr

Examples

Add one target business date column:

from datetime import date
import polars as pl

import data_engine.helpers

df = pl.DataFrame(
    {
        "received_date": [date(2026, 4, 13), date(2026, 4, 14)],
        "sla_days": [3, 5],
    }
).with_columns(
    due_date=data_engine.helpers.workday(
        "received_date",
        "sla_days",
        holidays=[date(2026, 4, 15)],
    )
)

Count the start date as day 1:

df = df.with_columns(
    due_date=data_engine.helpers.workday(
        "received_date",
        "sla_days",
        holidays=[date(2026, 4, 15)],
        count_first_day=True,
    )
)

Use a custom weekday mask where Saturday is also a business day:

df = df.with_columns(
    due_date=data_engine.helpers.workday(
        "received_date",
        "sla_days",
        mask=(True, True, True, True, True, True, False),
    )
)
write_parquet_atomic(df, path, **write_options)[source]

Write a Polars dataframe to parquet with same-directory atomic replacement.

The dataframe is first written to a unique temporary file beside the target, then moved into place with os.replace. This keeps readers from seeing a partially written parquet file while preserving normal Polars write options.

Parameters:
  • df (pl.DataFrame) – Eager Polars dataframe to write.

  • path (PathLike) – Target parquet path.

  • **write_options (object) – Keyword options forwarded to pl.DataFrame.write_parquet.

Returns:

Absolute target path that was replaced.

Return type:

Path

Examples

import polars as pl

from data_engine.helpers import write_parquet_atomic

target = write_parquet_atomic(
    pl.DataFrame({"claim_id": [1, 2]}),
    "workspaces/example/output/claims.parquet",
)

df = pl.DataFrame({"claim_id": [3]})
df.de.write_parquet_atomic(target)
write_excel_atomic(df, path, worksheet=None, **write_options)[source]

Write a Polars dataframe to Excel with same-directory atomic replacement.

The dataframe is first written to a unique temporary workbook beside the target, then moved into place with os.replace. All keyword options are forwarded to pl.DataFrame.write_excel.

Parameters:
  • df (pl.DataFrame) – Eager Polars dataframe to write.

  • path (PathLike) – Target Excel workbook path.

  • worksheet (str | None) – Optional worksheet name forwarded to pl.DataFrame.write_excel.

  • **write_options (object) – Keyword options forwarded to pl.DataFrame.write_excel.

Returns:

Absolute target path that was replaced.

Return type:

Path

Examples

import polars as pl

from data_engine.helpers import write_excel_atomic

target = write_excel_atomic(
    pl.DataFrame({"claim_id": [1, 2]}),
    "workspaces/example/output/claims.xlsx",
    worksheet="Claims",
    table_name="claims",
    autofit=True,
)

df = pl.DataFrame({"claim_id": [3]})
df.de.write_excel_atomic(target, worksheet="Claims")
sink_parquet_atomic(lf, path, **sink_options)[source]

Sink a Polars lazy frame to parquet with same-directory atomic replacement.

The lazy query is executed into a unique temporary file beside the target, then moved into place with os.replace. Use the default eager sink mode so the helper can complete the replacement in the same call.

Parameters:
  • lf (pl.LazyFrame) – Lazy Polars frame to execute and write.

  • path (PathLike) – Target parquet path.

  • **sink_options (object) – Keyword options forwarded to pl.LazyFrame.sink_parquet.

Returns:

Absolute target path that was replaced.

Return type:

Path

Raises:

ValueError – If lazy=True is passed.

Examples

import polars as pl

import data_engine.helpers

lf = pl.DataFrame({"claim_id": [1, 2]}).lazy()
lf.de.sink_parquet_atomic("workspaces/example/output/claims.parquet")
class DataEngineDataFrameNamespace(df)[source]

Bases: object

Data Engine helpers available from pl.DataFrame.de.

Parameters:

df (pl.DataFrame)

normalize_column_names(columns=None)[source]

Normalize column names on this dataframe.

Parameters:

columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all dataframe columns are normalized.

Returns:

Dataframe with normalized column names.

Return type:

pl.DataFrame

networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]

Return an Excel-style business-day count expression for this dataframe.

This is a convenience wrapper around data_engine.helpers.networkdays(). The returned value is still a normal pl.Expr, so it can be chained into cumulative windows and other Polars expressions.

Example

df = df.with_columns(
    business_days=df.de.networkdays(
        "start_date",
        "end_date",
        holidays=[date(2026, 4, 15)],
    )
)

df = df.sort(["claim_id", "sequence_number"]).with_columns(
    cumulative_business_days=
    pl.when(pl.col("use_days"))
    .then(df.de.networkdays("start_date", "end_date"))
    .otherwise(pl.lit(0))
    .cum_sum()
    .over("claim_id")
)
Parameters:
  • start (Expr | str | date | datetime)

  • end (Expr | str | date | datetime)

  • holidays (list[date | datetime | str] | tuple[date | datetime | str, ...] | set[date | datetime | str] | None)

  • count_first_day (bool)

  • mask (Iterable[bool] | None)

Return type:

Expr

workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]

Return an Excel-style workday offset expression for this dataframe.

This is a convenience wrapper around data_engine.helpers.workday().

Example

df = df.with_columns(
    due_date=df.de.workday(
        "received_date",
        "sla_days",
        holidays=[date(2026, 4, 15)],
    )
)
Parameters:
  • start (Expr | str | date | datetime)

  • days (Expr | str | int)

  • holidays (list[date | datetime | str] | tuple[date | datetime | str, ...] | set[date | datetime | str] | None)

  • count_first_day (bool)

  • mask (Iterable[bool] | None)

Return type:

Expr

write_parquet_atomic(path, **write_options)[source]

Write this dataframe to parquet with atomic target replacement.

Parameters:
  • path (PathLike) – Target parquet path.

  • **write_options (object) – Keyword options forwarded to pl.DataFrame.write_parquet.

Returns:

Absolute target path that was replaced.

Return type:

Path

write_excel_atomic(path, worksheet=None, **write_options)[source]

Write this dataframe to Excel with atomic target replacement.

Parameters:
  • path (PathLike) – Target Excel workbook path.

  • worksheet (str | None) – Optional worksheet name forwarded to pl.DataFrame.write_excel.

  • **write_options (object) – Keyword options forwarded to pl.DataFrame.write_excel.

Returns:

Absolute target path that was replaced.

Return type:

Path

build_dimension(db_path, table, *, key_column='dimension_key', return_df=True)[source]

Build or extend one DuckDB dimension table from this dataframe.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Dimension table name, optionally schema-qualified.

  • key_column (str) – Surrogate key column to create in the dimension table.

  • return_df (bool) – Whether to return the mapping dataframe for this dataframe’s natural keys.

Returns:

Mapping dataframe when return_df is true; otherwise None.

Return type:

pl.DataFrame | None

attach_dimension(db_path, table, *, on, key_column='dimension_key', drop_key=False)[source]

Attach an existing DuckDB dimension key to this dataframe.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Dimension table name, optionally schema-qualified.

  • on (ColumnNames) – Natural key column or columns used to join to the dimension table.

  • key_column (str) – Surrogate key column to attach.

  • drop_key (bool) – Whether to drop the natural key columns after attaching the surrogate key.

Returns:

Dataframe with the surrogate key column attached.

Return type:

pl.DataFrame

denormalize_columns(db_path, table, *, key_column='dimension_key', select='*', drop_key=False)[source]

Attach natural columns from an existing DuckDB dimension table.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Dimension table name, optionally schema-qualified.

  • key_column (str) – Surrogate key column used to join to the dimension table.

  • select (ColumnNames) – Natural columns to attach, or "*" for all non-key columns.

  • drop_key (bool) – Whether to drop key_column after attaching the natural columns.

Returns:

Dataframe with selected dimension columns attached.

Return type:

pl.DataFrame

normalize_columns(db_path, table, *, on, key_column='dimension_key', drop_key=True, returns='df')[source]

Build dimension keys and attach them back onto this dataframe.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Dimension table name, optionally schema-qualified.

  • on (ColumnNames) – Natural key column or columns used to build the dimension.

  • key_column (str) – Surrogate key column to create and attach.

  • drop_key (bool) – Whether to drop natural key columns after attaching the surrogate key.

  • returns (ReturnMode) – "df" for normalized input rows, "map" for only the key mapping, or None to only persist dimension rows.

Returns:

Normalized dataframe, mapping dataframe, or None according to returns.

Return type:

pl.DataFrame | None

replace_rows_by_file(db_path, table, *, file_hash, file_hash_column='file_key', return_df=True)[source]

Replace one file’s DuckDB rows and append this dataframe.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Destination table name, optionally schema-qualified.

  • file_hash (str) – Stable source-file identifier used to delete the previous batch.

  • file_hash_column (str) – Column name used to store file_hash in the destination table.

  • return_df (bool) – Whether to return this dataframe with the file hash column attached.

Returns:

Inserted rows with file_hash_column when return_df is true; otherwise None.

Return type:

pl.DataFrame | None

replace_rows_by_values(db_path, table, *, column, return_df=True)[source]

Replace DuckDB rows matching this dataframe’s values for one column.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Destination table name, optionally schema-qualified.

  • column (str) – Column whose incoming values define the rows to replace.

  • return_df (bool) – Whether to return the inserted dataframe.

Returns:

Inserted dataframe when return_df is true; otherwise None.

Return type:

pl.DataFrame | None

replace_table(db_path, table, *, return_df=True)[source]

Replace one DuckDB table wholesale from this dataframe.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Destination table name, optionally schema-qualified.

  • return_df (bool) – Whether to return the inserted dataframe.

Returns:

Inserted dataframe when return_df is true; otherwise None.

Return type:

pl.DataFrame | None

class DataEngineLazyFrameNamespace(lf)[source]

Bases: object

Data Engine helpers available from pl.LazyFrame.de.

Parameters:

lf (pl.LazyFrame)

normalize_column_names(columns=None)[source]

Normalize column names on this lazy frame.

Parameters:

columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all lazy-frame columns are normalized.

Returns:

Lazy frame with normalized column names.

Return type:

pl.LazyFrame

networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]

Return an Excel-style business-day count expression for this lazy frame.

This is a convenience wrapper around data_engine.helpers.networkdays(). The returned value stays lazy and can be chained into window expressions before collect().

Parameters:
  • start (Expr | str | date | datetime)

  • end (Expr | str | date | datetime)

  • holidays (list[date | datetime | str] | tuple[date | datetime | str, ...] | set[date | datetime | str] | None)

  • count_first_day (bool)

  • mask (Iterable[bool] | None)

Return type:

Expr

workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]

Return an Excel-style workday offset expression for this lazy frame.

This is a convenience wrapper around data_engine.helpers.workday().

Parameters:
  • start (Expr | str | date | datetime)

  • days (Expr | str | int)

  • holidays (list[date | datetime | str] | tuple[date | datetime | str, ...] | set[date | datetime | str] | None)

  • count_first_day (bool)

  • mask (Iterable[bool] | None)

Return type:

Expr

sink_parquet_atomic(path, **sink_options)[source]

Execute this lazy frame to parquet with atomic target replacement.

Parameters:
  • path (PathLike) – Target parquet path.

  • **sink_options (object) – Keyword options forwarded to pl.LazyFrame.sink_parquet.

Returns:

Absolute target path that was replaced.

Return type:

Path

build_dimension(db_path, table, *, key_column='dimension_key', return_df=True)[source]

Build or extend one DuckDB dimension table from this lazy frame.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Dimension table name, optionally schema-qualified.

  • key_column (str) – Surrogate key column to create in the dimension table.

  • return_df (bool) – Whether to return the mapping dataframe for this lazy frame’s natural keys.

Returns:

Mapping dataframe when return_df is true; otherwise None.

Return type:

pl.DataFrame | None

attach_dimension(db_path, table, *, on, key_column='dimension_key', drop_key=False)[source]

Attach an existing DuckDB dimension key to this lazy frame.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Dimension table name, optionally schema-qualified.

  • on (ColumnNames) – Natural key column or columns used to join to the dimension table.

  • key_column (str) – Surrogate key column to attach.

  • drop_key (bool) – Whether to drop the natural key columns after attaching the surrogate key.

Returns:

Collected dataframe with the surrogate key column attached.

Return type:

pl.DataFrame

denormalize_columns(db_path, table, *, key_column='dimension_key', select='*', drop_key=False)[source]

Attach natural columns from an existing DuckDB dimension table.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Dimension table name, optionally schema-qualified.

  • key_column (str) – Surrogate key column used to join to the dimension table.

  • select (ColumnNames) – Natural columns to attach, or "*" for all non-key columns.

  • drop_key (bool) – Whether to drop key_column after attaching the natural columns.

Returns:

Collected dataframe with selected dimension columns attached.

Return type:

pl.DataFrame

normalize_columns(db_path, table, *, on, key_column='dimension_key', drop_key=True, returns='df')[source]

Build dimension keys and attach them back onto this lazy frame.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Dimension table name, optionally schema-qualified.

  • on (ColumnNames) – Natural key column or columns used to build the dimension.

  • key_column (str) – Surrogate key column to create and attach.

  • drop_key (bool) – Whether to drop natural key columns after attaching the surrogate key.

  • returns (ReturnMode) – "df" for normalized input rows, "map" for only the key mapping, or None to only persist dimension rows.

Returns:

Normalized dataframe, mapping dataframe, or None according to returns.

Return type:

pl.DataFrame | None

replace_rows_by_file(db_path, table, *, file_hash, file_hash_column='file_key', return_df=True)[source]

Replace one file’s DuckDB rows and append this lazy frame.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Destination table name, optionally schema-qualified.

  • file_hash (str) – Stable source-file identifier used to delete the previous batch.

  • file_hash_column (str) – Column name used to store file_hash in the destination table.

  • return_df (bool) – Whether to return the collected frame with the file hash column attached.

Returns:

Inserted rows with file_hash_column when return_df is true; otherwise None.

Return type:

pl.DataFrame | None

replace_rows_by_values(db_path, table, *, column, return_df=True)[source]

Replace DuckDB rows matching this lazy frame’s values for one column.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Destination table name, optionally schema-qualified.

  • column (str) – Column whose incoming values define the rows to replace.

  • return_df (bool) – Whether to return the inserted dataframe.

Returns:

Inserted dataframe when return_df is true; otherwise None.

Return type:

pl.DataFrame | None

replace_table(db_path, table, *, return_df=True)[source]

Replace one DuckDB table wholesale from this lazy frame.

Parameters:
  • db_path (PathLike) – DuckDB database file path.

  • table (str) – Destination table name, optionally schema-qualified.

  • return_df (bool) – Whether to return the inserted dataframe.

Returns:

Inserted dataframe when return_df is true; otherwise None.

Return type:

pl.DataFrame | None

Public one-shot DuckDB helpers for flow authoring.

attach_dimension(db_path, table, *, df, on, key_column='dimension_key', drop_key=False)[source]

Attach an existing surrogate key mapping table to an input dataframe.

Parameters:
  • db_path (str | Path)

  • table (str)

  • df (DataFrame | LazyFrame)

  • on (str | list[str] | tuple[str, ...])

  • key_column (str)

  • drop_key (bool)

build_dimension(db_path, table, *, df, key_column='dimension_key', return_df=True)[source]

Build or extend one dimension table from unique incoming row combinations.

Parameters:
  • db_path (str | Path)

  • table (str)

  • df (DataFrame | LazyFrame)

  • key_column (str)

  • return_df (bool)

compact_database(db_path, *, tables=None, drop_all_null_columns=True, vacuum=True)[source]

Compact one DuckDB database by dropping all-null columns and optionally vacuuming.

Parameters:
  • db_path (str | Path)

  • tables (str | list[str] | tuple[str, ...] | None)

  • drop_all_null_columns (bool)

  • vacuum (bool)

Return type:

DataFrame

denormalize_columns(db_path, table, *, df, key_column='dimension_key', select='*', drop_key=False)[source]

Attach natural columns from an existing dimension table onto a keyed dataframe.

Parameters:
  • db_path (str | Path)

  • table (str)

  • df (DataFrame | LazyFrame)

  • key_column (str)

  • select (str | list[str] | tuple[str, ...])

  • drop_key (bool)

normalize_columns(db_path, table, *, df, on, key_column='dimension_key', drop_key=True, returns='df')[source]

Build missing surrogate keys and attach them back onto the input dataframe.

Parameters:
  • db_path (str | Path)

  • table (str)

  • df (DataFrame | LazyFrame)

  • on (str | list[str] | tuple[str, ...])

  • key_column (str)

  • drop_key (bool)

  • returns (str | None)

read_rows_by_values(db_path, table, *, column, is_in, select)[source]

Return selected columns for rows whose one column matches provided values.

Parameters:
  • db_path (str | Path) – DuckDB database file path.

  • table (str) – Source table name, optionally schema-qualified.

  • column (str) – Column matched against is_in.

  • is_in (list[object] | tuple[object, ...]) – Values to include.

  • select (str | list[str] | tuple[str, ...]) – Columns to return.

Returns:

Selected matching rows in input order by distinct lookup values.

Return type:

pl.DataFrame

Raises:

ValueError – If the table, column, or selected columns are invalid.

read_sql(db_path, *, sql)[source]

Run one SQL query against DuckDB and return the result as a Polars dataframe.

Parameters:
  • db_path (str | Path) – DuckDB database file path.

  • sql (str) – Query text to execute.

Returns:

Query result as a Polars dataframe.

Return type:

pl.DataFrame

read_table(db_path, table, *, select='*', where=None, limit=None)[source]

Read rows from one DuckDB table into a Polars dataframe.

Parameters:
  • db_path (str | Path) – DuckDB database file path.

  • table (str) – Source table name, optionally schema-qualified.

  • select (str | list[str] | tuple[str, ...]) – Columns to return, or "*" for all columns.

  • where (str | None) – Optional raw SQL predicate appended after WHERE.

  • limit (int | None) – Optional row limit.

Returns:

Selected table rows.

Return type:

pl.DataFrame

replace_rows_by_file(db_path, table, *, df, file_hash, file_hash_column='file_key', return_df=True)[source]

Atomically replace one file’s fact rows and append the current batch.

Parameters:
  • db_path (str | Path)

  • table (str)

  • df (DataFrame | LazyFrame)

  • file_hash (str)

  • file_hash_column (str)

  • return_df (bool)

replace_rows_by_values(db_path, table, *, df, column, return_df=True)[source]

Atomically replace one value-slice of rows and append the current batch.

Parameters:
  • db_path (str | Path)

  • table (str)

  • df (DataFrame | LazyFrame)

  • column (str)

  • return_df (bool)

replace_table(db_path, table, *, df, return_df=True)[source]

Replace one DuckDB table wholesale from the provided dataframe.

Parameters:
  • db_path (str | Path)

  • table (str)

  • df (DataFrame | LazyFrame)

  • return_df (bool)

Host Surfaces

APScheduler-backed host for scheduled flow execution.

class ScheduledFlowJob(job_id, flow_name, trigger_kind)[source]

Bases: object

Description of one scheduler job owned by the host.

Parameters:
  • job_id (str)

  • flow_name (str)

  • trigger_kind (str)

job_id: str
flow_name: str
trigger_kind: str
class SchedulerHost(*, runtime_engine=None, scheduler=None, job_id_prefix='data-engine:schedule:')[source]

Bases: object

Own APScheduler timing while delegating flow meaning to the runtime engine.

Parameters:
rebuild_jobs(flows)[source]

Replace scheduler jobs from discovered scheduled flows.

Parameters:

flows (tuple['Flow', ...])

Return type:

tuple[ScheduledFlowJob, …]

start()[source]

Start the underlying scheduler.

Return type:

None

shutdown(*, wait=True)[source]

Stop the underlying scheduler.

Parameters:

wait (bool)

Return type:

None

run_until_stopped(flows, stop_event)[source]

Run scheduled flow jobs until stop_event is set.

Parameters:
  • flows (tuple['Flow', ...])

  • stop_event (Event)

Return type:

tuple[ScheduledFlowJob, …]

class SchedulerPort(*args, **kwargs)[source]

Bases: Protocol

Small scheduler surface used by the scheduler host.

add_job(func, *, trigger, id, replace_existing=False, max_instances=1)[source]

Add or replace one scheduled job.

Parameters:
  • id (str)

  • replace_existing (bool)

  • max_instances (int)

remove_job(job_id)[source]

Remove one scheduled job by id.

Parameters:

job_id (str)

Return type:

None

start()[source]

Start the scheduler.

Return type:

None

shutdown(wait=True)[source]

Stop the scheduler.

Parameters:

wait (bool)

Return type:

None

Application Services

Daemon IPC and lifecycle services.

class DaemonService(*, spawn_process_func=<function spawn_daemon_process>, request_func=<function daemon_request>, is_live_func=<function is_daemon_live>, force_shutdown_func=<function force_shutdown_daemon_process>, client_error_type=<class 'data_engine.hosts.daemon.client.DaemonClientError'>)[source]

Bases: object

Thin injectable wrapper around daemon lifecycle and IPC helpers.

Parameters:
  • spawn_process_func (Callable[..., object])

  • request_func (Callable[..., dict[str, Any]])

  • is_live_func (Callable[[WorkspacePaths], bool])

  • force_shutdown_func (Callable[..., None])

  • client_error_type (type[Exception])

spawn(paths, *, lifecycle_policy=DaemonLifecyclePolicy.PERSISTENT)[source]

Start the local workspace daemon process for the given paths.

Parameters:
  • paths (WorkspacePaths)

  • lifecycle_policy (DaemonLifecyclePolicy)

Return type:

object

request(paths, payload, *, timeout=0.0)[source]

Send one request to the local workspace daemon.

Parameters:
  • paths (WorkspacePaths)

  • payload (dict[str, Any])

  • timeout (float)

Return type:

dict[str, Any]

is_live(paths)[source]

Return whether the local workspace daemon is reachable.

Parameters:

paths (WorkspacePaths)

Return type:

bool

force_shutdown(paths, *, timeout=0.5)[source]

Force-stop the local workspace daemon for the given paths.

Parameters:
  • paths (WorkspacePaths)

  • timeout (float)

Return type:

None

property client_error_type: type[Exception]

Return the daemon client exception type.

Workspace daemon state and control services.

class DaemonStateService(*, shared_state_adapter=None)[source]

Bases: object

Own workspace daemon-manager construction and normalized snapshot access.

Parameters:

shared_state_adapter (DaemonSharedStateAdapter | None)

create_manager(paths)[source]

Create one daemon-state manager for a workspace.

Parameters:

paths (WorkspacePaths)

Return type:

WorkspaceDaemonManager

sync(manager)[source]

Fetch one normalized daemon snapshot.

Parameters:

manager (WorkspaceDaemonManager)

Return type:

WorkspaceDaemonSnapshot

control_state(manager, snapshot, *, daemon_startup_in_progress=False)[source]

Build structured workspace control state from one daemon snapshot.

Parameters:
  • manager (WorkspaceDaemonManager)

  • snapshot (WorkspaceDaemonSnapshot)

  • daemon_startup_in_progress (bool)

Return type:

WorkspaceControlState

request_control(manager)[source]

Request workspace control through one daemon-state manager.

Parameters:

manager (WorkspaceDaemonManager)

Return type:

str

Flow catalog loading services.

class FlowCatalogService(*, discover_definitions_func=<function discover_flow_module_definitions>)[source]

Bases: object

Own flow catalog loading through an explicit discovery dependency.

Parameters:

discover_definitions_func (Callable[..., tuple[FlowModuleDefinition, ...]])

load_entries(*, workspace_root=None)[source]

Return discovered flow catalog entries for the requested workspace root.

Parameters:

workspace_root (Path | None)

Return type:

tuple[FlowCatalogEntry, …]

flow_catalog_entry_from_flow(flow, *, description)[source]
Parameters:
  • flow (Flow)

  • description (str | None)

Return type:

FlowCatalogEntry

Executable flow loading services.

class FlowExecutionService(*, load_flow_func=<function _default_load_flow>, discover_flows_func=<function _default_discover_flows>)[source]

Bases: object

Own executable flow loading through an explicit loader dependency.

Parameters:
  • load_flow_func (Callable[..., 'CoreFlow'])

  • discover_flows_func (Callable[..., tuple['CoreFlow', ...]])

load_flow(name, *, workspace_root=None)[source]

Return one executable flow definition by name.

Parameters:
  • name (str)

  • workspace_root (Path | None)

Return type:

CoreFlow

load_flows(names, *, workspace_root=None)[source]

Return executable flow definitions for the requested names.

Parameters:
  • names (tuple[str, ...])

  • workspace_root (Path | None)

Return type:

tuple[‘CoreFlow’, …]

discover_flows(*, workspace_root=None)[source]

Return all executable flow definitions for the requested workspace root.

Parameters:

workspace_root (Path | None)

Return type:

tuple[‘CoreFlow’, …]

Runtime control-ledger services.

LedgerService

alias of RuntimeControlLedgerService

class RuntimeControlLedgerService(open_ledger_func=None, *, runtime_layout_policy=None)[source]

Bases: object

Own workspace-local runtime control-ledger access and client-session bookkeeping.

Parameters:
  • open_ledger_func (Callable[[Path], RuntimeControlStore] | None)

  • runtime_layout_policy (RuntimeLayoutPolicy | None)

open_for_workspace(workspace_root)[source]

Open the configured runtime control ledger for one workspace root.

Parameters:

workspace_root (Path)

Return type:

RuntimeControlStore

close(ledger)[source]

Close one runtime control-ledger connection.

Parameters:

ledger (RuntimeControlStore)

Return type:

None

register_client_session(ledger, *, client_id, workspace_id, client_kind, pid)[source]

Register or refresh one active local client session.

Parameters:
  • ledger (RuntimeControlStore)

  • client_id (str)

  • workspace_id (str)

  • client_kind (str)

  • pid (int)

Return type:

None

remove_client_session(ledger, client_id)[source]

Remove one active local client session row.

Parameters:
  • ledger (RuntimeControlStore)

  • client_id (str)

Return type:

None

purge_process_client_sessions(ledger, *, workspace_id, client_kind, pid)[source]

Remove all client sessions for one workspace/client-kind/process tuple.

Parameters:
  • ledger (RuntimeControlStore)

  • workspace_id (str)

  • client_kind (str)

  • pid (int)

Return type:

None

count_live_client_sessions(ledger, workspace_id, *, exclude_client_id=None)[source]

Return the number of currently live client sessions for one workspace.

Parameters:
  • ledger (RuntimeControlStore)

  • workspace_id (str)

  • exclude_client_id (str | None)

Return type:

int

Operator log history services.

class LogService[source]

Bases: object

Own operator log-store construction and log history queries.

create_store(runtime_cache_ledger=None)[source]

Create one log store hydrated from the given runtime cache store.

Parameters:

runtime_cache_ledger (RuntimeCacheStore | None)

Return type:

FlowLogStore

reload(store, runtime_cache_ledger)[source]

Reload one log store from an explicit runtime cache store.

Parameters:
  • store (FlowLogStore)

  • runtime_cache_ledger (RuntimeCacheStore | None)

Return type:

None

append_entry(store, entry)[source]

Append one log entry to the current store.

Parameters:
  • store (FlowLogStore)

  • entry (FlowLogEntry)

Return type:

None

clear_flow(store, flow_name)[source]

Clear one flow’s visible log history from the current store.

Parameters:
  • store (FlowLogStore)

  • flow_name (str | None)

Return type:

None

all_entries(store)[source]

Return every entry currently held in the store.

Parameters:

store (FlowLogStore)

Return type:

tuple[FlowLogEntry, …]

entries_for_flow(store, flow_name)[source]

Return flow-scoped entries for one selected flow.

Parameters:
  • store (FlowLogStore)

  • flow_name (str | None)

Return type:

tuple[FlowLogEntry, …]

runs_for_flow(store, flow_name)[source]

Return grouped run history for one selected flow.

Parameters:
  • store (FlowLogStore)

  • flow_name (str | None)

Return type:

tuple[FlowRunState, …]

Workspace runtime binding services for operator surfaces.

class WorkspaceRuntimeBinding(workspace_paths, runtime_cache_ledger, runtime_control_ledger, log_store, daemon_manager)[source]

Bases: object

Concrete runtime resources bound to one selected workspace.

Parameters:
  • workspace_paths (WorkspacePaths)

  • runtime_cache_ledger (RuntimeCacheStore)

  • runtime_control_ledger (RuntimeControlStore)

  • log_store (FlowLogStore)

  • daemon_manager (WorkspaceDaemonManager)

workspace_paths: WorkspacePaths
runtime_cache_ledger: RuntimeCacheStore
runtime_control_ledger: RuntimeControlStore
log_store: FlowLogStore
daemon_manager: WorkspaceDaemonManager
class WorkspaceRuntimeBindingService(*, ledger_service, log_service, daemon_state_service, runtime_history_service)[source]

Bases: object

Own concrete runtime binding lifecycle for GUI/TUI surfaces.

Parameters:
open_binding(workspace_paths)[source]

Open one concrete runtime binding for a workspace selection.

Parameters:

workspace_paths (WorkspacePaths)

Return type:

WorkspaceRuntimeBinding

close_binding(binding)[source]

Close one concrete runtime binding.

Parameters:

binding (WorkspaceRuntimeBinding)

Return type:

None

register_client_session(binding, *, client_id, client_kind, pid=None)[source]

Register or refresh one local client session for the binding workspace.

Parameters:
Return type:

None

remove_client_session(binding, client_id)[source]

Remove one active local client session row.

Parameters:
Return type:

None

purge_process_client_sessions(binding, *, client_kind, pid=None)[source]

Remove all client sessions for this workspace/client-kind/process tuple.

Parameters:
Return type:

None

count_live_client_sessions(binding, *, exclude_client_id=None)[source]

Return the number of live local client sessions for the binding workspace.

Parameters:
Return type:

int

sync_runtime_state(binding, *, runtime_application, flow_cards, daemon_startup_in_progress=False)[source]

Return daemon/runtime sync state for one bound workspace.

Parameters:
Return type:

object

reload_logs(binding)[source]

Reload the binding log store from its runtime cache store.

Parameters:

binding (WorkspaceRuntimeBinding)

Return type:

None

invalidate_flow_history(binding, *, flow_name)[source]

Drop one flow’s cached logs and derived step-output state after destructive resets.

Parameters:
Return type:

None

rebuild_step_outputs(binding, flow_cards)[source]

Rebuild latest successful per-step output paths for visible flows.

Parameters:
Return type:

StepOutputIndex

error_text_for_entry(binding, run_group, entry)[source]

Return one user-facing error title and persisted error text.

Parameters:
Return type:

tuple[str, str | None]

recent_run_count(binding, *, days)[source]

Return the number of persisted runs started in the recent window.

Parameters:
Return type:

int

Runtime execution services for flow runs and grouped engine runs.

class RuntimeExecutionService(*, flow_runtime_type=<class 'data_engine.runtime.execution.single.FlowRuntime'>, grouped_runtime_type=<class 'data_engine.runtime.execution.grouped.GroupedFlowRuntime'>, runtime_engine_type=<class 'data_engine.runtime.engine.RuntimeEngine'>, scheduler_host_factory=<class 'data_engine.hosts.scheduler.SchedulerHost'>, run_stop_controller=None)[source]

Bases: object

Own executable runtime construction for manual and grouped runs.

Parameters:
run_once(flow, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None)[source]

Run one flow as a one-shot execution.

Parameters:
  • flow (CoreFlow)

  • runtime_ledger (RuntimeCacheStore | None)

  • runtime_stop_event (Event | None)

  • flow_stop_event (Event | None)

Return type:

object

run_source(flow, source_path, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None)[source]

Run one flow for a specific source path.

Parameters:
  • flow (CoreFlow)

  • source_path (str)

  • runtime_ledger (RuntimeCacheStore | None)

  • runtime_stop_event (Event | None)

  • flow_stop_event (Event | None)

Return type:

object

run_batch(flow, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None)[source]

Run one flow once in batch mode.

Parameters:
  • flow (CoreFlow)

  • runtime_ledger (RuntimeCacheStore | None)

  • runtime_stop_event (Event | None)

  • flow_stop_event (Event | None)

Return type:

object

preview(flow, *, use=None, runtime_ledger=None)[source]

Preview one flow through the one-shot runtime path.

Parameters:
  • flow (CoreFlow)

  • use (str | None)

  • runtime_ledger (RuntimeCacheStore | None)

Return type:

object

run_manual(flow, *, runtime_ledger, runtime_stop_event, flow_stop_event=None)[source]

Run one flow as a manual one-shot execution.

Parameters:
  • flow (CoreFlow)

  • runtime_ledger (RuntimeCacheStore)

  • runtime_stop_event (Event)

  • flow_stop_event (Event | None)

Return type:

object

run_continuous(flow, *, runtime_ledger=None, flow_stop_event=None)[source]

Run one flow continuously.

Parameters:
  • flow (CoreFlow)

  • runtime_ledger (RuntimeCacheStore | None)

  • flow_stop_event (Event | None)

Return type:

object

run_grouped(flows, *, runtime_ledger, runtime_stop_event, flow_stop_event)[source]

Run grouped automated flows continuously.

Parameters:
  • flows (tuple['CoreFlow', ...])

  • runtime_ledger (RuntimeCacheStore)

  • runtime_stop_event (Event)

  • flow_stop_event (Event)

Return type:

object

run_automated(flows, *, runtime_ledger=None, runtime_stop_event, flow_stop_event)[source]

Run automated poll and schedule flows through separate host timing surfaces.

Parameters:
  • flows (tuple['CoreFlow', ...])

  • runtime_ledger (RuntimeCacheStore | None)

  • runtime_stop_event (Event)

  • flow_stop_event (Event)

Return type:

object

run_grouped_continuous(flows, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None)[source]

Run grouped automated flows continuously with optional runtime controls.

Parameters:
  • flows (tuple['CoreFlow', ...])

  • runtime_ledger (RuntimeCacheStore | None)

  • runtime_stop_event (Event | None)

  • flow_stop_event (Event | None)

Return type:

object

stop(run_id, *, flow_stop_event=None)[source]

Request that an active runtime stop a run by id.

Parameters:
  • run_id (str)

  • flow_stop_event (Event | None)

Return type:

None

Runtime history query services.

class RuntimeHistoryService[source]

Bases: object

Own persisted run/step history queries used by operator surfaces.

rebuild_step_outputs(ledger, flow_cards)[source]

Rebuild latest successful per-step output paths for visible flows.

Parameters:
  • ledger (RuntimeCacheStore)

  • flow_cards (dict[str, FlowCatalogLike])

Return type:

StepOutputRefresh

refresh_step_outputs(ledger, flow_cards, *, current_index, last_seen_step_run_id)[source]

Incrementally merge newly finished successful step outputs into the current index.

Parameters:
  • ledger (RuntimeCacheStore)

  • flow_cards (dict[str, FlowCatalogLike])

  • current_index (StepOutputIndex)

  • last_seen_step_run_id (int | None)

Return type:

StepOutputRefresh

error_text_for_entry(ledger, run_group, entry)[source]

Return one user-facing error title and persisted error text for a failed entry.

Parameters:
  • ledger (RuntimeCacheStore)

  • run_group (FlowRunState)

  • entry (FlowLogEntry)

Return type:

tuple[str, str | None]

class StepOutputRefresh(last_step_run_id, index)[source]

Bases: object

One step-output refresh result with cache watermark.

Parameters:
  • last_step_run_id (int | None)

  • index (StepOutputIndex)

last_step_run_id: int | None
index: StepOutputIndex

Machine-local settings services.

class SettingsService(store)[source]

Bases: object

Own machine-local settings persistence for operator surfaces.

Parameters:

store (LocalSettingsStore)

classmethod default_store(*, app_root=None)[source]

Open the default local settings store for one app root.

Parameters:

app_root (Path | None)

Return type:

LocalSettingsStore

classmethod open_default(*, app_root=None, store_factory=None)[source]

Open the default local settings store for the current app root.

Parameters:
  • app_root (Path | None)

  • store_factory (Callable[[Path | None], LocalSettingsStore] | None)

Return type:

SettingsService

workspace_collection_root()[source]

Return the saved local workspace collection root override, when present.

Return type:

Path | None

set_workspace_collection_root(value)[source]

Persist the local workspace collection root override.

Parameters:

value (Path | str | None)

Return type:

None

default_workspace_id()[source]

Return the saved default workspace id, when present.

Return type:

str | None

set_default_workspace_id(value)[source]

Persist the default workspace id.

Parameters:

value (str | None)

Return type:

None

runtime_root()[source]

Return the saved runtime/artifact root override, when present.

Return type:

Path | None

set_runtime_root(value)[source]

Persist the runtime/artifact root override.

Parameters:

value (Path | str | None)

Return type:

None

Shared workspace lease metadata and runtime snapshot services.

class SharedStateService[source]

Bases: object

Own lease-based shared snapshot hydration for operator surfaces.

hydrate_local_runtime(paths, ledger)[source]

Replace one local runtime ledger from the shared workspace snapshots.

Parameters:
  • paths (WorkspacePaths)

  • ledger (RuntimeSnapshotStore)

Return type:

None

read_lease_metadata(paths)[source]

Return current workspace lease metadata, if present.

Parameters:

paths (WorkspacePaths)

Return type:

dict[str, Any] | None

lease_is_stale(paths, *, stale_after_seconds)[source]

Return whether current workspace lease metadata is stale.

Parameters:
  • paths (WorkspacePaths)

  • stale_after_seconds (float)

Return type:

bool

reset_flow_state(paths, *, flow_name)[source]

Delete one flow’s shared snapshot history and freshness state.

Parameters:
  • paths (WorkspacePaths)

  • flow_name (str)

Return type:

None

reset_workspace_state(paths)[source]

Delete all shared coordination and snapshot state for one workspace.

Parameters:

paths (WorkspacePaths)

Return type:

None

Shared theme resolution services.

class ThemeService(*, themes={'dark': ThemePalette(name='dark', window_bg='#0d1117', app_bg='#0d1117', panel_bg='#161b22', panel_border='#30363d', text='#c9d1d9', muted_text='#8b949e', section_text='#7d8590', accent_text='#2ea043', warning_text='#d29922', error_text='#f85149', button_bg='#21262d', button_hover='#30363d', button_checked_bg='#1f6feb', button_checked_border='#388bfd', button_disabled_bg='#161b22', button_disabled_border='#30363d', button_disabled_text='#6e7681', input_bg='#0d1117', input_border='#30363d', hover_bg='#1b2230', hover_border='#3b4556', selection_bg='#1f6feb', selection_text='#f0f6fc', selection_border='#388bfd', tab_bg='#161b22', tab_hover_bg='#1b2230', tab_selected_bg='#21262d', progress_bg='#0d1117', progress_chunk='#2ea043', summary_bg='#21262d', summary_border='#30363d', request_control_bg='#F04A00', request_control_border='#c23c00', request_control_hover='#d84300', engine_start_bg='#1f883d', engine_start_border='#1a7f37', engine_start_hover='#1a7f37', engine_stop_bg='#cf222e', engine_stop_border='#a40e26', engine_stop_hover='#a40e26'), 'light': ThemePalette(name = 'light', window_bg='#ffffff', app_bg='#f6f8fa', panel_bg='#ffffff', panel_border='#d0d7de', text='#1f2328', muted_text='#656d76', section_text='#57606a', accent_text='#1a7f37', warning_text='#9a6700', error_text='#cf222e', button_bg='#f6f8fa', button_hover='#eef2f6', button_checked_bg='#ddf4ff', button_checked_border='#54aeff', button_disabled_bg='#f6f8fa', button_disabled_border='#d8dee4', button_disabled_text='#8c959f', input_bg='#ffffff', input_border='#d0d7de', hover_bg='#f6f8fa', hover_border='#c7d2dd', selection_bg='#0969da', selection_text='#ffffff', selection_border='#54aeff', tab_bg='#f6f8fa', tab_hover_bg='#eef2f6', tab_selected_bg='#ffffff', progress_bg='#eef2f6', progress_chunk='#1a7f37', summary_bg='#f6f8fa', summary_border='#d0d7de', request_control_bg='#F04A00', request_control_border='#c23c00', request_control_hover='#d84300', engine_start_bg='#1f883d', engine_start_border='#1a7f37', engine_start_hover='#1a7f37', engine_stop_bg='#cf222e', engine_stop_border='#a40e26', engine_stop_hover='#a40e26')}, default_theme_name='system', resolve_theme_name_func=<function resolve_theme_name>, system_theme_name_func=<function system_theme_name>, toggle_theme_name_func=<function toggle_theme_name>, theme_button_text_func=<function theme_button_text>)[source]

Bases: object

Thin injectable wrapper around shared theme state decisions.

Parameters:
  • themes (Mapping[str, ThemePalette])

  • default_theme_name (str)

  • resolve_theme_name_func (Callable[[str], str])

  • system_theme_name_func (Callable[[], str])

  • toggle_theme_name_func (Callable[[str], str])

  • theme_button_text_func (Callable[[str], str])

resolve_name(theme_name='system')[source]

Resolve one explicit or system-bound theme name.

Parameters:

theme_name (str)

Return type:

str

system_name()[source]

Return the host-system theme name.

Return type:

str

toggle_name(theme_name)[source]

Return the opposite theme name.

Parameters:

theme_name (str)

Return type:

str

button_text(theme_name)[source]

Return the user-facing theme toggle text.

Parameters:

theme_name (str)

Return type:

str

palette(theme_name='system')[source]

Return the resolved semantic palette.

Parameters:

theme_name (str)

Return type:

ThemePalette

Workspace provisioning helpers shared by CLI and GUI surfaces.

class WorkspaceProvisioningResult(workspace_root, created_paths, preserved_paths)[source]

Bases: object

Describe which workspace assets were created during provisioning.

Parameters:
  • workspace_root (Path)

  • created_paths (tuple[Path, ...])

  • preserved_paths (tuple[Path, ...])

workspace_root: Path
created_paths: tuple[Path, ...]
preserved_paths: tuple[Path, ...]
property created_anything: bool

Return whether provisioning created any new files or directories.

class WorkspaceProvisioningService[source]

Bases: object

Own safe workspace-folder provisioning for operator surfaces.

provision_workspace(workspace_paths, *, interpreter_path=None)[source]

Provision missing authored-workspace folders without overwriting existing content.

Parameters:
  • workspace_paths (WorkspacePaths)

  • interpreter_path (Path | None)

Return type:

WorkspaceProvisioningResult

collection_vscode_settings(collection_root, *, app_root, interpreter_path=None)[source]

Return VS Code settings for one workspace collection root.

Parameters:
  • collection_root (Path)

  • app_root (Path)

  • interpreter_path (Path | None)

Return type:

dict[str, object]

checkout_source_dir(app_root)[source]

Return the repo-local source directory when app_root points at a checkout.

Parameters:

app_root (Path)

Return type:

Path | None

checkout_tests_dir(app_root)[source]

Return the repo-local tests directory when app_root points at a checkout.

Parameters:

app_root (Path)

Return type:

Path | None

write_collection_vscode_settings(collection_root, *, app_root, interpreter_path=None, overwrite=False)[source]

Write collection-root VS Code settings unless an existing file should be preserved.

Parameters:
  • collection_root (Path)

  • app_root (Path)

  • interpreter_path (Path | None)

  • overwrite (bool)

Return type:

Path | None

workspace_vscode_settings(workspace_root, *, app_root, interpreter_path=None)[source]

Return VS Code settings for one workspace root.

Parameters:
  • workspace_root (Path)

  • app_root (Path)

  • interpreter_path (Path | None)

Return type:

dict[str, object]

write_workspace_vscode_settings(workspace_root, *, app_root, interpreter_path=None, overwrite=False)[source]

Write workspace-local VS Code settings unless an existing file should be preserved.

Parameters:
  • workspace_root (Path)

  • app_root (Path)

  • interpreter_path (Path | None)

  • overwrite (bool)

Return type:

Path | None

Workspace path and discovery services.

class WorkspaceService(*, app_state_policy=None, discovery_policy=None, runtime_layout_policy=None, discover_workspaces_func=None, resolve_workspace_paths_func=None)[source]

Bases: object

Own workspace discovery and path resolution through explicit collaborators.

Parameters:
  • app_state_policy (AppStatePolicy | None)

  • discovery_policy (WorkspaceDiscoveryPolicy | None)

  • runtime_layout_policy (RuntimeLayoutPolicy | None)

  • discover_workspaces_func (Callable[..., tuple[DiscoveredWorkspace, ...]] | None)

  • resolve_workspace_paths_func (Callable[..., WorkspacePaths] | None)

discover(*, app_root=None, workspace_collection_root=None)[source]

Return discoverable workspaces for the current app and collection roots.

Parameters:
  • app_root (Path | None)

  • workspace_collection_root (Path | None)

Return type:

tuple[DiscoveredWorkspace, …]

resolve_paths(*, workspace_id=None, workspace_root=None, data_root=None, workspace_collection_root=None)[source]

Resolve one workspace path set with the current override-aware rules.

Parameters:
  • workspace_id (str | None)

  • workspace_root (Path | None)

  • data_root (Path | None)

  • workspace_collection_root (Path | None)

Return type:

WorkspacePaths