Source code for data_engine.runtime.result_cleanup

"""Helpers for releasing completed runtime results."""

from __future__ import annotations

import gc

from data_engine.core.primitives import FlowContext, WorkspaceConfigContext


def release_context_values(context: FlowContext) -> None:
    """Drop run-owned references once a completed context is no longer needed."""
    context.source = None
    context.mirror = None
    context.current = None
    context.objects.clear()
    context.metadata.clear()
    context.config = WorkspaceConfigContext()
    context.debug = None
    gc.collect()


[docs] def release_completed_results(result: object) -> object: """Drop bulky references from completed flow results when callers do not need them.""" if not isinstance(result, list) or not all(isinstance(item, FlowContext) for item in result): return result for context in result: release_context_values(context) return result
__all__ = ["release_context_values", "release_completed_results"]