Coverage for railway / core / pipeline.py: 80%
71 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 00:06 +0900
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 00:06 +0900
1"""Pipeline execution for Railway Framework."""
3import asyncio
4from collections.abc import Callable
5from typing import Any
7from loguru import logger
9from railway.core.type_check import (
10 check_type_compatibility,
11 format_type_error,
12 get_function_input_type,
13)
16def pipeline(
17 initial: Any,
18 *steps: Callable[[Any], Any],
19 type_check: bool = True,
20 strict: bool = False,
21) -> Any:
22 """
23 Execute a pipeline of processing steps.
25 Features:
26 1. Sequential execution of steps
27 2. Automatic error propagation (skip remaining steps on error)
28 3. Runtime type checking between steps (enabled by default)
29 4. Detailed logging of execution flow
30 5. Strict mode type checking (optional)
32 IMPORTANT: Understanding the 'initial' argument
33 -----------------------------------------------
34 The 'initial' argument is the STARTING VALUE for the pipeline.
35 It is NOT a function, but a value (or the result of a function call).
37 Args:
38 initial: Initial value to pass to first step
39 *steps: Processing functions to apply sequentially
40 type_check: Enable runtime type checking (default: True)
41 strict: Enable strict type checking between steps (default: False)
43 Returns:
44 Final result from the last step
46 Raises:
47 Exception: If any step fails
48 TypeError: If an async function is passed or type mismatch in strict mode
50 Example:
51 result = pipeline(
52 fetch_data(), # Initial value (evaluated immediately)
53 process_data, # Step 1: receives result of fetch_data()
54 save_data, # Step 2: receives result of process_data()
55 )
56 """
57 # Check for async functions
58 for step in steps:
59 # Check the original function if it's a decorated node
60 is_async = getattr(step, "_is_async", False) or asyncio.iscoroutinefunction(
61 getattr(step, "_original_func", step)
62 )
63 if is_async:
64 step_name = getattr(step, "_node_name", step.__name__)
65 raise TypeError(
66 f"Async function '{step_name}' cannot be used in pipeline(). "
67 "Use async_pipeline() for async nodes."
68 )
70 logger.debug(f"Pipeline starting with {len(steps)} steps")
72 # Return initial value if no steps
73 if not steps:
74 return initial
76 current_value = initial
77 current_step = 0
79 try:
80 for i, step in enumerate(steps, 1):
81 current_step = i
82 step_name = getattr(step, "_node_name", step.__name__)
84 # Type check before execution (if strict mode)
85 if strict:
86 expected_type = get_function_input_type(step)
87 if expected_type is not None: 87 ↛ 99line 87 didn't jump to line 99 because the condition on line 87 was always true
88 if not check_type_compatibility(current_value, expected_type):
89 raise TypeError(
90 format_type_error(
91 step_num=i,
92 step_name=step_name,
93 expected_type=expected_type,
94 actual_type=type(current_value),
95 actual_value=current_value,
96 )
97 )
99 logger.debug(f"Pipeline step {i}/{len(steps)}: {step_name}")
101 try:
102 result = step(current_value)
103 current_value = result
104 logger.debug(f"Pipeline step {i}/{len(steps)}: Success")
106 except Exception as e:
107 logger.error(
108 f"Pipeline step {i}/{len(steps)} ({step_name}): "
109 f"Failed with {type(e).__name__}: {e}"
110 )
111 logger.info(f"Pipeline: Skipping remaining {len(steps) - i} steps")
112 raise
114 logger.debug("Pipeline completed successfully")
115 return current_value
117 except Exception:
118 logger.error(f"Pipeline failed at step {current_step}/{len(steps)}")
119 raise
122async def async_pipeline(
123 initial: Any,
124 *steps: Callable[[Any], Any],
125 strict: bool = False,
126) -> Any:
127 """
128 Execute an asynchronous pipeline of processing steps.
130 Supports both sync and async nodes. Async nodes are awaited automatically.
132 Args:
133 initial: Initial value to pass to first step
134 *steps: Processing functions to apply sequentially (sync or async)
135 strict: Enable strict type checking between steps (default: False)
137 Returns:
138 Final result from the last step
140 Raises:
141 Exception: If any step fails
142 TypeError: If type mismatch in strict mode
144 Example:
145 result = await async_pipeline(
146 "https://api.example.com",
147 async_fetch, # Async step 1
148 process_data, # Sync step 2
149 async_save, # Async step 3
150 )
151 """
152 logger.debug(f"Async pipeline starting with {len(steps)} steps")
154 # Return initial value if no steps
155 if not steps: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 return initial
158 current_value = initial
159 current_step = 0
161 try:
162 for i, step in enumerate(steps, 1):
163 current_step = i
164 step_name = getattr(step, "_node_name", step.__name__)
165 is_async = getattr(step, "_is_async", False) or asyncio.iscoroutinefunction(
166 getattr(step, "_original_func", step)
167 )
169 # Type check before execution (if strict mode)
170 if strict: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 expected_type = get_function_input_type(step)
172 if expected_type is not None:
173 if not check_type_compatibility(current_value, expected_type):
174 raise TypeError(
175 format_type_error(
176 step_num=i,
177 step_name=step_name,
178 expected_type=expected_type,
179 actual_type=type(current_value),
180 actual_value=current_value,
181 )
182 )
184 logger.debug(f"Async pipeline step {i}/{len(steps)}: {step_name}")
186 try:
187 if is_async:
188 result = await step(current_value)
189 else:
190 result = step(current_value)
191 current_value = result
192 logger.debug(f"Async pipeline step {i}/{len(steps)}: Success")
194 except Exception as e:
195 logger.error(
196 f"Async pipeline step {i}/{len(steps)} ({step_name}): "
197 f"Failed with {type(e).__name__}: {e}"
198 )
199 logger.info(f"Async pipeline: Skipping remaining {len(steps) - i} steps")
200 raise
202 logger.debug("Async pipeline completed successfully")
203 return current_value
205 except Exception:
206 logger.error(f"Async pipeline failed at step {current_step}/{len(steps)}")
207 raise