Source code for data_engine.services.runtime_history

"""Runtime history query services."""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

from data_engine.domain import FlowCatalogLike, FlowLogEntry, FlowRunState, StepOutputIndex
from data_engine.runtime.ledger_models import PersistedStepRun
from data_engine.services.runtime_ports import RuntimeCacheStore


[docs] @dataclass(frozen=True) class StepOutputRefresh: """One step-output refresh result with cache watermark.""" last_step_run_id: int | None index: StepOutputIndex
[docs] class RuntimeHistoryService: """Own persisted run/step history queries used by operator surfaces."""
[docs] def rebuild_step_outputs( self, ledger: RuntimeCacheStore, flow_cards: dict[str, FlowCatalogLike], ) -> StepOutputRefresh: """Rebuild latest successful per-step output paths for visible flows.""" rebuilt: dict[str, dict[str, Path]] = {} last_step_run_id: int | None = None for flow_name, card in flow_cards.items(): outputs: dict[str, Path] = {} step_runs = ledger.step_outputs.list(flow_name=flow_name) if step_runs: candidate_last_id = step_runs[-1].id last_step_run_id = candidate_last_id if last_step_run_id is None else max(last_step_run_id, candidate_last_id) for step_run in reversed(step_runs): if step_run.status != "success" or not step_run.output_path: continue if step_run.step_label not in card.operation_items or step_run.step_label in outputs: continue output_path = Path(str(step_run.output_path)) if output_path.exists(): outputs[step_run.step_label] = output_path if len(outputs) == len(card.operation_items): break rebuilt[flow_name] = outputs return StepOutputRefresh( last_step_run_id=last_step_run_id, index=StepOutputIndex.from_mapping(rebuilt), )
[docs] def refresh_step_outputs( self, ledger: RuntimeCacheStore, flow_cards: dict[str, FlowCatalogLike], *, current_index: StepOutputIndex, last_seen_step_run_id: int | None, ) -> StepOutputRefresh: """Incrementally merge newly finished successful step outputs into the current index.""" new_rows = ledger.step_outputs.list(after_id=last_seen_step_run_id) if not new_rows: return StepOutputRefresh(last_step_run_id=last_seen_step_run_id, index=current_index) outputs_by_flow = { flow_name: dict(current_index.outputs_for(flow_name).outputs) for flow_name in flow_cards } next_last_id = last_seen_step_run_id for step_run in new_rows: next_last_id = step_run.id if next_last_id is None else max(next_last_id, step_run.id) self._merge_step_output_row(outputs_by_flow, flow_cards, step_run) return StepOutputRefresh( last_step_run_id=next_last_id, index=StepOutputIndex.from_mapping(outputs_by_flow), )
def _merge_step_output_row( self, outputs_by_flow: dict[str, dict[str, Path]], flow_cards: dict[str, FlowCatalogLike], step_run: PersistedStepRun, ) -> None: card = flow_cards.get(step_run.flow_name) if card is None or step_run.status != "success" or not step_run.output_path: return if step_run.step_label not in card.operation_items: return output_path = Path(str(step_run.output_path)) if not output_path.exists(): return outputs_by_flow.setdefault(step_run.flow_name, {})[step_run.step_label] = output_path
[docs] def error_text_for_entry( self, ledger: RuntimeCacheStore, run_group: FlowRunState, entry: FlowLogEntry, ) -> tuple[str, str | None]: """Return one user-facing error title and persisted error text for a failed entry.""" run_id = run_group.key[1] event = entry.event detail_text: str | None = None title = "Run Error" if event is not None and event.step_name is not None: for step_run in ledger.step_outputs.list_for_run(run_id): if step_run.step_label == event.step_name and step_run.status == "failed": detail_text = step_run.error_text title = f"{event.step_name} Error" break if detail_text is None: for run in ledger.runs.list(flow_name=run_group.key[0]): if run.run_id == run_id: detail_text = run.error_text break return title, detail_text
__all__ = ["RuntimeHistoryService", "StepOutputRefresh"]