Source code for data_engine.runtime.file_watch

"""Filesystem discovery and polling services used by the flow runtime."""

from __future__ import annotations

from pathlib import Path
from typing import Iterable, Protocol, runtime_checkable

from data_engine.core.model import FlowValidationError
from data_engine.platform.paths import normalized_path_text, path_sort_key


def _normalized_name(value: str) -> str:
    """Normalize a filename for case-insensitive temporary-file checks."""
    return normalized_path_text(value).casefold()


def _queue_key(path: Path) -> str:
    """Return a stable sort key for a filesystem path."""
    return path_sort_key(path)


def _normalize_extensions(extensions: tuple[str, ...] | None) -> tuple[str, ...] | None:
    """Normalize extension filters to lowercase dotted forms."""
    if extensions is None:
        return None
    normalized: list[str] = []
    for ext in extensions:
        value = str(ext).strip().lower()
        if not value:
            raise FlowValidationError("Empty extension is not allowed.")
        if not value.startswith("."):
            value = f".{value}"
        normalized.append(value)
    return tuple(normalized)


[docs] def is_temporary_file_path(path: Path) -> bool: """Return whether a path looks like a transient temp or download file.""" name = _normalized_name(path.name) if name.startswith(".~lock.") and name.endswith("#"): return True if name.startswith(".") or name.startswith("~$") or name.startswith("._"): return True if name.endswith("~"): return True if any(name.endswith(suffix) for suffix in (".tmp", ".temp", ".part", ".partial", ".crdownload", ".download", ".swp")): return True return False
[docs] def iter_candidate_paths( input_root: Path, *, extensions: tuple[str, ...] | None = None, recursive: bool = True, allow_missing: bool = False, ) -> Iterable[Path]: """Yield candidate files from one root using optional extension filters.""" if not input_root.exists(): if allow_missing: return raise FlowValidationError(f"Input root not found: {input_root}") normalized_extensions = _normalize_extensions(extensions) if input_root.is_file(): candidates: Iterable[Path] = (input_root,) else: globber = input_root.rglob("*") if recursive else input_root.glob("*") candidates = sorted(globber, key=_queue_key) for path in candidates: if not path.is_file(): continue if normalized_extensions is not None and path.suffix.lower() not in normalized_extensions: continue if is_temporary_file_path(path): continue yield path
[docs] @runtime_checkable class IFileWatcher(Protocol): """Common interface for runtime file watchers."""
[docs] def start(self) -> None: """Begin watching for filesystem changes."""
[docs] def stop(self) -> None: """Stop watching for filesystem changes."""
[docs] def drain_events(self) -> list[Path]: """Return any queued filesystem events observed since the last drain."""
[docs] class PollingWatcher: """Filesystem polling watcher for one file or directory root.""" def __init__( self, input_root: Path, *, recursive: bool = True, extensions: tuple[str, ...] | None = None, settle: int = 1, ) -> None: if settle < 0: raise FlowValidationError("settle must be zero or greater.") self.input_root = input_root self.recursive = recursive self.extensions = _normalize_extensions(extensions) self.settle = settle self._seen: dict[Path, tuple[int, int, int]] = {} self._stable_counts: dict[Path, int] = {} self._emitted: dict[Path, tuple[int, int, int]] = {} self._running = False
[docs] def start(self) -> None: """Capture an initial filesystem snapshot and begin watching.""" self._seen = self._snapshot() self._stable_counts = {path: 0 for path in self._seen} self._emitted = dict(self._seen) self._running = True
[docs] def stop(self) -> None: """Stop watching for new filesystem events.""" self._running = False
[docs] def drain_events(self) -> list[Path]: """Return newly stable files observed since the last poll.""" if not self._running: return [] current = self._snapshot() events: list[Path] = [] stable_counts: dict[Path, int] = {} for path, signature in current.items(): prior_signature = self._seen.get(path) if prior_signature == signature: stable_counts[path] = self._stable_counts.get(path, 0) + 1 else: stable_counts[path] = 0 if self._emitted.get(path) == signature: continue if stable_counts[path] < self.settle: continue events.append(path) self._emitted[path] = signature self._stable_counts = stable_counts self._seen = current return events
def _snapshot(self) -> dict[Path, tuple[int, int, int]]: """Capture the current file signatures for all candidate paths.""" result: dict[Path, tuple[int, int, int]] = {} for path in iter_candidate_paths(self.input_root, extensions=self.extensions, recursive=self.recursive, allow_missing=True): try: stat = path.stat() result[path] = ( stat.st_mtime_ns, stat.st_size, getattr(stat, "st_ctime_ns", 0), ) except FileNotFoundError: continue self._prune_removed_paths(result) return result def _prune_removed_paths(self, current: dict[Path, tuple[int, int, int]]) -> None: """Drop removed paths from watcher state maps.""" current_paths = set(current) self._stable_counts = {path: count for path, count in self._stable_counts.items() if path in current_paths} self._emitted = {path: sig for path, sig in self._emitted.items() if path in current_paths}
__all__ = ["IFileWatcher", "PollingWatcher", "iter_candidate_paths", "is_temporary_file_path"]