Source code for data_engine.authoring.flow

"""Flow DSL and public authoring entrypoints."""

from __future__ import annotations

import inspect
from pathlib import Path
from typing import TYPE_CHECKING

from data_engine.core.flow import Flow as _CoreFlow
from data_engine.core.helpers import _validate_slot_name
from data_engine.core.model import FlowValidationError
from data_engine.core.primitives import FlowContext

if TYPE_CHECKING:
    from data_engine.authoring.services import AuthoringServices
    from data_engine.services.flow_execution import FlowExecutionService
    from data_engine.services.runtime_execution import RuntimeExecutionService


def _resolve_authoring_services(
    *,
    authoring_services: AuthoringServices | None = None,
    runtime_execution_service: RuntimeExecutionService | None = None,
    flow_execution_service: FlowExecutionService | None = None,
) -> AuthoringServices:
    """Return one authoring collaborator bundle with explicit overrides applied."""
    from data_engine.authoring.services import build_authoring_services, default_authoring_services

    services = authoring_services or default_authoring_services()
    if runtime_execution_service is None and flow_execution_service is None:
        return services
    return build_authoring_services(
        runtime_execution_service=runtime_execution_service or services.runtime_execution_service,
        flow_execution_service=flow_execution_service or services.flow_execution_service,
    )


def _infer_authoring_flow_origin() -> tuple[str, Path] | None:
    """Return ``(flow_name, workspace_root)`` for direct authored module execution.

    This is a narrow authoring convenience for calls such as ``build().preview()``
    executed directly inside ``workspaces/<id>/flow_modules/*.py``. It should not
    affect compiled runtime execution or ad hoc inline flows outside a workspace
    ``flow_modules`` tree.
    """
    current = inspect.currentframe()
    if current is None:
        return None
    try:
        frame = current.f_back
        while frame is not None:
            filename = frame.f_code.co_filename
            try:
                path = Path(filename).resolve()
            except (OSError, RuntimeError):
                frame = frame.f_back
                continue
            if path.name == __file__:
                frame = frame.f_back
                continue
            if path.parent.name == "flow_modules":
                return path.stem, path.parent.parent.resolve()
            frame = frame.f_back
    finally:
        del current
    return None


def _with_inferred_authoring_metadata(flow: "Flow") -> "Flow":
    """Fill in missing direct-authoring metadata when executing from flow_modules."""
    if flow.name is not None and flow._workspace_root is not None:
        return flow
    inferred = _infer_authoring_flow_origin()
    if inferred is None:
        return flow
    inferred_name, inferred_workspace_root = inferred
    return flow._clone(
        name=flow.name or inferred_name,
        _workspace_root=flow._workspace_root or inferred_workspace_root,
    )


[docs] class Flow(_CoreFlow): """Public authoring flow with execution conveniences layered over core definitions."""
[docs] def run_once( self, *, authoring_services: AuthoringServices | None = None, runtime_execution_service: RuntimeExecutionService | None = None, ) -> list[FlowContext]: """Run this flow once and return completed runtime contexts. Parameters ---------- authoring_services : AuthoringServices | None Optional service bundle used by tests or embedded hosts. runtime_execution_service : RuntimeExecutionService | None Optional runtime execution service override. Returns ------- list[FlowContext] One context per executed source. """ flow = _with_inferred_authoring_metadata(self) service = _resolve_authoring_services( authoring_services=authoring_services, runtime_execution_service=runtime_execution_service, ).runtime_execution_service return service.run_once(flow)
[docs] def preview( self, *, use: str | None = None, authoring_services: AuthoringServices | None = None, runtime_execution_service: RuntimeExecutionService | None = None, ) -> object: """Run this flow in preview mode and return one preview value. Parameters ---------- use : str | None Optional named object slot to preview instead of the final current value. authoring_services : AuthoringServices | None Optional service bundle used by tests or embedded hosts. runtime_execution_service : RuntimeExecutionService | None Optional runtime execution service override. Returns ------- object Preview value returned by the runtime execution service. Raises ------ FlowValidationError If preview is requested from inside a compiled flow module. """ from data_engine.flow_modules.flow_module_loader import in_compiled_flow_module_context if in_compiled_flow_module_context(): raise FlowValidationError("preview() is not available inside compiled flow modules.") flow = _with_inferred_authoring_metadata(self) normalized_use = _validate_slot_name(method_name="preview", slot_name="use", value=use) service = _resolve_authoring_services( authoring_services=authoring_services, runtime_execution_service=runtime_execution_service, ).runtime_execution_service return service.preview(flow, use=normalized_use)
[docs] def run( self, *, authoring_services: AuthoringServices | None = None, runtime_execution_service: RuntimeExecutionService | None = None, ) -> list[FlowContext]: """Run this flow continuously according to its trigger. Parameters ---------- authoring_services : AuthoringServices | None Optional service bundle used by tests or embedded hosts. runtime_execution_service : RuntimeExecutionService | None Optional runtime execution service override. Returns ------- list[FlowContext] Completed contexts collected before the runtime exits. """ flow = _with_inferred_authoring_metadata(self) service = _resolve_authoring_services( authoring_services=authoring_services, runtime_execution_service=runtime_execution_service, ).runtime_execution_service return service.run_continuous(flow)
[docs] def load_flow( name: str, *, data_root: Path | None = None, authoring_services: AuthoringServices | None = None, flow_execution_service: FlowExecutionService | None = None, ) -> Flow: """Load one code-defined flow by flow-module name.""" service = _resolve_authoring_services( authoring_services=authoring_services, flow_execution_service=flow_execution_service, ).flow_execution_service return service.load_flow(name, workspace_root=data_root)
[docs] def discover_flows( *, data_root: Path | None = None, authoring_services: AuthoringServices | None = None, flow_execution_service: FlowExecutionService | None = None, ) -> tuple[Flow, ...]: """Discover and build all code-defined flows from compiled flow modules.""" service = _resolve_authoring_services( authoring_services=authoring_services, flow_execution_service=flow_execution_service, ).flow_execution_service return service.discover_flows(workspace_root=data_root)
[docs] def run( *flows: Flow, authoring_services: AuthoringServices | None = None, runtime_execution_service: RuntimeExecutionService | None = None, ) -> list[FlowContext]: """Run multiple flows with sequential execution per group and parallel groups.""" service = _resolve_authoring_services( authoring_services=authoring_services, runtime_execution_service=runtime_execution_service, ).runtime_execution_service return service.run_grouped_continuous(tuple(flows))
__all__ = ["Flow", "discover_flows", "load_flow", "run"]