Coverage for railway / core / pipeline.py: 80%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 00:06 +0900

1"""Pipeline execution for Railway Framework.""" 

2 

3import asyncio 

4from collections.abc import Callable 

5from typing import Any 

6 

7from loguru import logger 

8 

9from railway.core.type_check import ( 

10 check_type_compatibility, 

11 format_type_error, 

12 get_function_input_type, 

13) 

14 

15 

16def pipeline( 

17 initial: Any, 

18 *steps: Callable[[Any], Any], 

19 type_check: bool = True, 

20 strict: bool = False, 

21) -> Any: 

22 """ 

23 Execute a pipeline of processing steps. 

24 

25 Features: 

26 1. Sequential execution of steps 

27 2. Automatic error propagation (skip remaining steps on error) 

28 3. Runtime type checking between steps (enabled by default) 

29 4. Detailed logging of execution flow 

30 5. Strict mode type checking (optional) 

31 

32 IMPORTANT: Understanding the 'initial' argument 

33 ----------------------------------------------- 

34 The 'initial' argument is the STARTING VALUE for the pipeline. 

35 It is NOT a function, but a value (or the result of a function call). 

36 

37 Args: 

38 initial: Initial value to pass to first step 

39 *steps: Processing functions to apply sequentially 

40 type_check: Enable runtime type checking (default: True) 

41 strict: Enable strict type checking between steps (default: False) 

42 

43 Returns: 

44 Final result from the last step 

45 

46 Raises: 

47 Exception: If any step fails 

48 TypeError: If an async function is passed or type mismatch in strict mode 

49 

50 Example: 

51 result = pipeline( 

52 fetch_data(), # Initial value (evaluated immediately) 

53 process_data, # Step 1: receives result of fetch_data() 

54 save_data, # Step 2: receives result of process_data() 

55 ) 

56 """ 

57 # Check for async functions 

58 for step in steps: 

59 # Check the original function if it's a decorated node 

60 is_async = getattr(step, "_is_async", False) or asyncio.iscoroutinefunction( 

61 getattr(step, "_original_func", step) 

62 ) 

63 if is_async: 

64 step_name = getattr(step, "_node_name", step.__name__) 

65 raise TypeError( 

66 f"Async function '{step_name}' cannot be used in pipeline(). " 

67 "Use async_pipeline() for async nodes." 

68 ) 

69 

70 logger.debug(f"Pipeline starting with {len(steps)} steps") 

71 

72 # Return initial value if no steps 

73 if not steps: 

74 return initial 

75 

76 current_value = initial 

77 current_step = 0 

78 

79 try: 

80 for i, step in enumerate(steps, 1): 

81 current_step = i 

82 step_name = getattr(step, "_node_name", step.__name__) 

83 

84 # Type check before execution (if strict mode) 

85 if strict: 

86 expected_type = get_function_input_type(step) 

87 if expected_type is not None: 87 ↛ 99line 87 didn't jump to line 99 because the condition on line 87 was always true

88 if not check_type_compatibility(current_value, expected_type): 

89 raise TypeError( 

90 format_type_error( 

91 step_num=i, 

92 step_name=step_name, 

93 expected_type=expected_type, 

94 actual_type=type(current_value), 

95 actual_value=current_value, 

96 ) 

97 ) 

98 

99 logger.debug(f"Pipeline step {i}/{len(steps)}: {step_name}") 

100 

101 try: 

102 result = step(current_value) 

103 current_value = result 

104 logger.debug(f"Pipeline step {i}/{len(steps)}: Success") 

105 

106 except Exception as e: 

107 logger.error( 

108 f"Pipeline step {i}/{len(steps)} ({step_name}): " 

109 f"Failed with {type(e).__name__}: {e}" 

110 ) 

111 logger.info(f"Pipeline: Skipping remaining {len(steps) - i} steps") 

112 raise 

113 

114 logger.debug("Pipeline completed successfully") 

115 return current_value 

116 

117 except Exception: 

118 logger.error(f"Pipeline failed at step {current_step}/{len(steps)}") 

119 raise 

120 

121 

122async def async_pipeline( 

123 initial: Any, 

124 *steps: Callable[[Any], Any], 

125 strict: bool = False, 

126) -> Any: 

127 """ 

128 Execute an asynchronous pipeline of processing steps. 

129 

130 Supports both sync and async nodes. Async nodes are awaited automatically. 

131 

132 Args: 

133 initial: Initial value to pass to first step 

134 *steps: Processing functions to apply sequentially (sync or async) 

135 strict: Enable strict type checking between steps (default: False) 

136 

137 Returns: 

138 Final result from the last step 

139 

140 Raises: 

141 Exception: If any step fails 

142 TypeError: If type mismatch in strict mode 

143 

144 Example: 

145 result = await async_pipeline( 

146 "https://api.example.com", 

147 async_fetch, # Async step 1 

148 process_data, # Sync step 2 

149 async_save, # Async step 3 

150 ) 

151 """ 

152 logger.debug(f"Async pipeline starting with {len(steps)} steps") 

153 

154 # Return initial value if no steps 

155 if not steps: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 return initial 

157 

158 current_value = initial 

159 current_step = 0 

160 

161 try: 

162 for i, step in enumerate(steps, 1): 

163 current_step = i 

164 step_name = getattr(step, "_node_name", step.__name__) 

165 is_async = getattr(step, "_is_async", False) or asyncio.iscoroutinefunction( 

166 getattr(step, "_original_func", step) 

167 ) 

168 

169 # Type check before execution (if strict mode) 

170 if strict: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 expected_type = get_function_input_type(step) 

172 if expected_type is not None: 

173 if not check_type_compatibility(current_value, expected_type): 

174 raise TypeError( 

175 format_type_error( 

176 step_num=i, 

177 step_name=step_name, 

178 expected_type=expected_type, 

179 actual_type=type(current_value), 

180 actual_value=current_value, 

181 ) 

182 ) 

183 

184 logger.debug(f"Async pipeline step {i}/{len(steps)}: {step_name}") 

185 

186 try: 

187 if is_async: 

188 result = await step(current_value) 

189 else: 

190 result = step(current_value) 

191 current_value = result 

192 logger.debug(f"Async pipeline step {i}/{len(steps)}: Success") 

193 

194 except Exception as e: 

195 logger.error( 

196 f"Async pipeline step {i}/{len(steps)} ({step_name}): " 

197 f"Failed with {type(e).__name__}: {e}" 

198 ) 

199 logger.info(f"Async pipeline: Skipping remaining {len(steps) - i} steps") 

200 raise 

201 

202 logger.debug("Async pipeline completed successfully") 

203 return current_value 

204 

205 except Exception: 

206 logger.error(f"Async pipeline failed at step {current_step}/{len(steps)}") 

207 raise