Source code for data_engine.core.primitives

"""Core flow specs, contexts, and small containers."""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
import tomllib
from typing import Callable, Generic, Iterator, TypeVar

from data_engine.core.helpers import _normalize_extensions, _resolve_flow_path
from data_engine.core.model import FlowValidationError
from data_engine.platform.workspace_models import WORKSPACE_CONFIG_DIR_NAME, WORKSPACE_DATABASES_DIR_NAME

T = TypeVar("T")


[docs] @dataclass(frozen=True) class WatchSpec: """Normalized runtime watch configuration.""" mode: str run_as: str max_parallel: int = 1 source: Path | None = None interval: str | None = None interval_seconds: float | None = None time: str | tuple[str, ...] | None = None times: tuple[str, ...] = () time_slots: tuple[tuple[int, int], ...] = () extensions: tuple[str, ...] | None = None settle: int = 1
[docs] @dataclass(frozen=True) class MirrorSpec: """Static flow-level mirror binding.""" root: Path
[docs] @dataclass(frozen=True) class StepSpec: """One generic callable step in a flow.""" fn: Callable[..., object] use: str | None save_as: str | None label: str function_name: str
[docs] @dataclass(frozen=True) class SourceMetadata: """Resolved filesystem metadata for the current source file.""" path: Path name: str size_bytes: int modified_at_utc: datetime
[docs] @dataclass class WorkspaceConfigContext: """Lazy read-only access to workspace-local TOML config files. ``context.config`` reads files from ``<workspace>/config/*.toml`` on demand. It returns dictionaries so flows can keep environment-specific settings out of Python modules without introducing a larger configuration framework. Attributes ---------- workspace_root : Path | None Authored workspace root. When omitted, config lookup is unavailable and returns no names. Examples -------- .. code-block:: python from data_engine.core.primitives import WorkspaceConfigContext config = WorkspaceConfigContext() assert config.names() == () """ workspace_root: Path | None = None _cache: dict[str, dict[str, object]] = field(default_factory=dict) _names: tuple[str, ...] | None = None @property def config_dir(self) -> Path | None: """Return the conventional config directory for the authored workspace.""" if self.workspace_root is None: return None return self.workspace_root / WORKSPACE_CONFIG_DIR_NAME
[docs] def names(self) -> tuple[str, ...]: """Return available config file stems beneath config/.""" if self._names is not None: return self._names config_dir = self.config_dir if config_dir is None or not config_dir.is_dir(): self._names = () return self._names self._names = tuple( path.stem for path in sorted(config_dir.glob("*.toml")) if path.is_file() and not path.name.startswith(".") ) return self._names
[docs] def get(self, name: str) -> dict[str, object] | None: """Return one parsed config mapping when available.""" normalized_name = str(name).strip() if not normalized_name: raise FlowValidationError("config.get() name must be non-empty.") if normalized_name in self._cache: return dict(self._cache[normalized_name]) config_dir = self.config_dir if config_dir is None: return None config_path = config_dir / f"{normalized_name}.toml" if not config_path.is_file(): return None try: with config_path.open("rb") as handle: parsed = tomllib.load(handle) except tomllib.TOMLDecodeError as exc: raise FlowValidationError(f"Config file {config_path} is not valid TOML: {exc}") from exc self._cache[normalized_name] = parsed return dict(parsed)
[docs] def require(self, name: str) -> dict[str, object]: """Return one parsed config mapping or fail loudly when missing.""" parsed = self.get(name) if parsed is not None: return parsed config_dir = self.config_dir if config_dir is None: raise FlowValidationError("config.require() is only available for authored workspace flows.") raise FlowValidationError(f"Required config file was not found: {config_dir / f'{str(name).strip()}.toml'}")
[docs] def all(self) -> dict[str, dict[str, object]]: """Return all parsed config mappings keyed by file stem.""" return {name: self.require(name) for name in self.names()}
[docs] @dataclass(frozen=True) class MirrorContext: """Write-ready mirrored output namespace for one runtime source. ``context.mirror`` is available when a flow was configured with ``Flow.mirror(root=...)``. The helpers return paths and create parent directories as needed, but they do not write file contents. """ root: Path source_path: Path | None = None relative_path: Path | None = None def __post_init__(self) -> None: object.__setattr__(self, "root", Path(self.root).resolve()) if self.source_path is not None: object.__setattr__(self, "source_path", Path(self.source_path).resolve()) if self.relative_path is not None: object.__setattr__(self, "relative_path", Path(self.relative_path)) def _prepare(self, path: Path) -> Path: resolved = path.resolve() resolved.parent.mkdir(parents=True, exist_ok=True) return resolved @property def dir(self) -> Path: """Return a write-ready namespace directory for derived files.""" if self.source_path is None or self.relative_path is None: self.root.mkdir(parents=True, exist_ok=True) return self.root directory = self.root / self.relative_path.with_suffix("") directory.mkdir(parents=True, exist_ok=True) return directory.resolve() @property def folder(self) -> Path: """Return the mirrored parent folder for the current source file.""" if self.relative_path is None: self.root.mkdir(parents=True, exist_ok=True) return self.root directory = self.root / self.relative_path.parent directory.mkdir(parents=True, exist_ok=True) return directory.resolve()
[docs] def with_suffix(self, suffix: str) -> Path: """Return the canonical mirrored source path with a replaced suffix.""" if self.source_path is None or self.relative_path is None: raise FlowValidationError("mirror.with_suffix() requires a concrete source file.") normalized_suffix = _normalize_extensions((suffix,))[0] return self._prepare((self.root / self.relative_path).with_suffix(normalized_suffix))
[docs] def with_extension(self, suffix: str) -> Path: """Return the canonical mirrored source path with a replaced extension.""" return self.with_suffix(suffix)
[docs] def file(self, name: str | Path) -> Path: """Return a write-ready file path in the mirrored source folder.""" candidate = Path(name) if candidate.is_absolute(): raise FlowValidationError("mirror.file() name must be relative.") if not str(candidate).strip(): raise FlowValidationError("mirror.file() name must be non-empty.") return self._prepare(self.folder / candidate)
[docs] def namespaced_file(self, name: str | Path) -> Path: """Return a write-ready derived file path inside the mirrored source namespace.""" candidate = Path(name) if candidate.is_absolute(): raise FlowValidationError("mirror.namespaced_file() name must be relative.") if not str(candidate).strip(): raise FlowValidationError("mirror.namespaced_file() name must be non-empty.") return self._prepare(self.dir / candidate)
[docs] def root_file(self, name: str | Path) -> Path: """Return a write-ready file path directly beneath the mirror root.""" candidate = Path(name) if candidate.is_absolute(): raise FlowValidationError("mirror.root_file() name must be relative.") if not str(candidate).strip(): raise FlowValidationError("mirror.root_file() name must be non-empty.") return self._prepare(self.root / candidate)
[docs] @dataclass(frozen=True) class SourceContext: """Resolved source namespace for one runtime source. ``context.source`` points at the watched source root and, for individual file runs, the concrete source file. Its helpers are read-oriented path conveniences; unlike ``MirrorContext`` they do not create directories. """ root: Path path: Path | None = None relative_path: Path | None = None def __post_init__(self) -> None: object.__setattr__(self, "root", Path(self.root).resolve()) if self.path is not None: object.__setattr__(self, "path", Path(self.path).resolve()) if self.relative_path is not None: object.__setattr__(self, "relative_path", Path(self.relative_path)) @property def dir(self) -> Path: """Return the namespace directory for files derived from the active source.""" if self.path is None or self.relative_path is None: return self.root return (self.root / self.relative_path.with_suffix("")).resolve() @property def folder(self) -> Path: """Return the parent folder for the active source file.""" if self.relative_path is None: return self.root return (self.root / self.relative_path.parent).resolve()
[docs] def with_suffix(self, suffix: str) -> Path: """Return the source path with a replaced suffix.""" if self.path is None or self.relative_path is None: raise FlowValidationError("source.with_suffix() requires a concrete source file.") normalized_suffix = _normalize_extensions((suffix,))[0] return (self.root / self.relative_path).with_suffix(normalized_suffix).resolve()
[docs] def with_extension(self, suffix: str) -> Path: """Return the source path with a replaced extension.""" return self.with_suffix(suffix)
[docs] def file(self, name: str | Path) -> Path: """Return a derived file path in the active source folder.""" candidate = Path(name) if candidate.is_absolute(): raise FlowValidationError("source.file() name must be relative.") if not str(candidate).strip(): raise FlowValidationError("source.file() name must be non-empty.") return (self.folder / candidate).resolve()
[docs] def namespaced_file(self, name: str | Path) -> Path: """Return a derived file path inside the active source namespace.""" candidate = Path(name) if candidate.is_absolute(): raise FlowValidationError("source.namespaced_file() name must be relative.") if not str(candidate).strip(): raise FlowValidationError("source.namespaced_file() name must be non-empty.") if self.path is None or self.relative_path is None: raise FlowValidationError("source.namespaced_file() requires a concrete source file.") return (self.dir / candidate).resolve()
[docs] def root_file(self, name: str | Path) -> Path: """Return a file path directly beneath the source root.""" candidate = Path(name) if candidate.is_absolute(): raise FlowValidationError("source.root_file() name must be relative.") if not str(candidate).strip(): raise FlowValidationError("source.root_file() name must be non-empty.") return (self.root / candidate).resolve()
[docs] @dataclass class FlowContext: """Mutable runtime state shared across steps during one flow execution. Steps receive a ``FlowContext`` object. ``current`` is the active value, ``objects`` stores named intermediate values created with ``save_as``, ``metadata`` holds runtime annotations, and ``source``/``mirror`` expose source and output path helpers when the flow configuration provides them. Attributes ---------- flow_name : str Stable flow name for the current execution. group : str Flow group used by operator surfaces. source : SourceContext | None Source path helper for source-backed executions. mirror : MirrorContext | None Write-ready mirrored output helper when the flow configured a mirror. current : object | None Active value passed between steps. objects : dict[str, object] Named intermediate values saved by ``save_as``. metadata : dict[str, object] Runtime metadata attached to the execution. config : WorkspaceConfigContext Lazy workspace config reader. Examples -------- .. code-block:: python from data_engine.core.primitives import FlowContext context = FlowContext(flow_name="claims", group="Claims", current=1) context.objects["raw"] = context.current assert context.current == 1 assert context.objects["raw"] == 1 """ flow_name: str group: str source: SourceContext | None = None mirror: MirrorContext | None = None current: object | None = None objects: dict[str, object] = field(default_factory=dict) metadata: dict[str, object] = field(default_factory=dict) config: WorkspaceConfigContext = field(default_factory=WorkspaceConfigContext)
[docs] def source_metadata(self) -> SourceMetadata | None: """Return filesystem metadata for the current source file when available.""" source_path = self.source.path if self.source is not None else None if source_path is None: return None stat = source_path.stat() return SourceMetadata( path=source_path, name=source_path.name, size_bytes=stat.st_size, modified_at_utc=datetime.fromtimestamp(stat.st_mtime, timezone.utc), )
[docs] def database(self, name: str | Path) -> Path: """Return a write-ready path beneath the workspace databases directory. Use this for workspace-owned DuckDB files and other durable database artifacts. The returned path is rooted under ``<workspace>/databases/`` and parent directories are created for you. Parameters ---------- name : str | Path Relative database file name, such as ``"analytics.duckdb"`` or ``"claims/analytics.duckdb"``. Returns ------- Path Absolute write-ready database path. Raises ------ FlowValidationError If the flow is not running from an authored workspace, or if ``name`` is absolute or empty. """ if self.config.workspace_root is None: raise FlowValidationError("context.database() is only available for authored workspace flows.") candidate = Path(name) if candidate.is_absolute(): raise FlowValidationError("context.database() name must be relative.") if not str(candidate).strip(): raise FlowValidationError("context.database() name must be non-empty.") path = (self.config.workspace_root / WORKSPACE_DATABASES_DIR_NAME / candidate).resolve() path.parent.mkdir(parents=True, exist_ok=True) return path
[docs] @dataclass(frozen=True) class FileRef: """Thin runtime wrapper for one filesystem path in a batch-oriented flow.""" path: Path def __post_init__(self) -> None: object.__setattr__(self, "path", Path(self.path).resolve()) @property def name(self) -> str: """Return the file name including extension.""" return self.path.name @property def stem(self) -> str: """Return the file name without extension.""" return self.path.stem @property def suffix(self) -> str: """Return the file extension.""" return self.path.suffix @property def parent(self) -> Path: """Return the parent directory.""" return self.path.parent
[docs] def exists(self) -> bool: """Return whether the referenced path currently exists.""" return self.path.exists()
def __fspath__(self) -> str: return str(self.path) def __str__(self) -> str: return str(self.path)
[docs] @dataclass(frozen=True) class Batch(Generic[T]): """Small iterable runtime container used instead of exposing raw lists by default.""" items: tuple[T, ...] def __iter__(self) -> Iterator[T]: return iter(self.items) def __len__(self) -> int: return len(self.items) def __getitem__(self, index: int) -> T: return self.items[index]
[docs] def names(self) -> tuple[str, ...]: """Return each item name when all items expose a string name.""" names: list[str] = [] for item in self.items: value = getattr(item, "name", None) if callable(value): value = value() if not isinstance(value, str): raise FlowValidationError("Batch item does not expose a usable name.") names.append(value) return tuple(names)
[docs] def paths(self) -> tuple[Path, ...]: """Return each item path when all items expose a Path-valued path.""" paths: list[Path] = [] for item in self.items: value = getattr(item, "path", None) if not isinstance(value, Path): raise FlowValidationError("Batch item does not expose a usable path.") paths.append(value) return tuple(paths)
[docs] def collect_files( extensions: tuple[str, ...] | list[str] | set[str], *, root: str | Path | None = None, recursive: bool = False, ) -> Callable[[FlowContext], Batch[FileRef]]: """Return a step callable that collects matching files into a Batch of FileRef items.""" normalized_extensions = _normalize_extensions(extensions) assert normalized_extensions is not None resolved_root = _resolve_flow_path(root) if root is not None else None def _collect(context: FlowContext) -> Batch[FileRef]: base = resolved_root if base is None and context.source is not None: base = context.source.root if base is None: raise FlowValidationError("collect_files() requires an explicit root or a flow context with source.") if not base.exists(): return Batch(()) matcher = base.rglob if recursive else base.glob items = tuple( FileRef(path) for path in sorted(matcher("*")) if path.is_file() and path.suffix.lower() in normalized_extensions ) return Batch(items) return _collect
__all__ = [ "Batch", "FileRef", "FlowContext", "MirrorContext", "MirrorSpec", "SourceContext", "SourceMetadata", "StepSpec", "WatchSpec", "WorkspaceConfigContext", "collect_files", ]