Coverage for railway / core / decorators.py: 90%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 00:06 +0900

1""" 

2Decorators for Railway nodes and entry points. 

3""" 

4 

5import asyncio 

6from collections.abc import Callable 

7from functools import wraps 

8from typing import Any, ParamSpec, TypeVar 

9 

10import typer 

11from loguru import logger 

12from tenacity import ( 

13 retry as tenacity_retry, 

14 stop_after_attempt, 

15 wait_exponential, 

16 RetryError, 

17 before_sleep_log, 

18 AsyncRetrying, 

19) 

20 

21P = ParamSpec("P") 

22T = TypeVar("T") 

23 

24 

25class Retry: 

26 """Retry configuration for nodes.""" 

27 

28 def __init__( 

29 self, 

30 max_attempts: int = 3, 

31 min_wait: float = 2.0, 

32 max_wait: float = 10.0, 

33 exponential_base: int = 2, 

34 ): 

35 self.max_attempts = max_attempts 

36 self.min_wait = min_wait 

37 self.max_wait = max_wait 

38 self.exponential_base = exponential_base 

39 self.multiplier = exponential_base # Alias for compatibility 

40 

41 

42def node( 

43 func: Callable[P, T] | None = None, 

44 *, 

45 retry: bool | Retry = False, 

46 log_input: bool = False, 

47 log_output: bool = False, 

48 name: str | None = None, 

49) -> Callable[P, T] | Callable[[Callable[P, T]], Callable[P, T]]: 

50 """ 

51 Node decorator that provides: 

52 1. Automatic exception handling with logging 

53 2. Optional retry with exponential backoff 

54 3. Structured logging 

55 4. Metadata storage 

56 

57 Args: 

58 func: Function to decorate 

59 retry: Enable retry (bool) or provide Retry config 

60 log_input: Log input parameters (caution: may log sensitive data) 

61 log_output: Log output data (caution: may log sensitive data) 

62 name: Override node name (default: function name) 

63 

64 Returns: 

65 Decorated function with automatic error handling 

66 

67 Example: 

68 @node 

69 def fetch_data() -> dict: 

70 return api.get("/data") 

71 

72 @node(retry=True) 

73 def fetch_with_retry() -> dict: 

74 return api.get("/data") 

75 

76 @node(retry=Retry(max_attempts=5, min_wait=1)) 

77 def custom_retry() -> dict: 

78 return api.get("/data") 

79 """ 

80 

81 def decorator(f: Callable[P, T]) -> Callable[P, T]: 

82 node_name = name or f.__name__ 

83 is_async = asyncio.iscoroutinefunction(f) 

84 

85 if is_async: 

86 return _create_async_wrapper(f, node_name, retry, log_input, log_output) 

87 else: 

88 return _create_sync_wrapper(f, node_name, retry, log_input, log_output) 

89 

90 # Handle decorator usage with and without parentheses 

91 if func is None: 

92 return decorator 

93 return decorator(func) 

94 

95 

96def _create_sync_wrapper( 

97 f: Callable[P, T], 

98 node_name: str, 

99 retry: bool | Retry, 

100 log_input: bool, 

101 log_output: bool, 

102) -> Callable[P, T]: 

103 """Create wrapper for synchronous function.""" 

104 

105 @wraps(f) 

106 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: 

107 # Log input if enabled 

108 if log_input: 

109 logger.debug(f"[{node_name}] Input: args={args}, kwargs={kwargs}") 

110 

111 logger.info(f"[{node_name}] Starting...") 

112 

113 # Determine retry configuration 

114 retry_config = _get_retry_configuration(retry, node_name) 

115 

116 try: 

117 if retry_config is not None: 

118 # Execute with retry 

119 result = _execute_with_retry(f, retry_config, node_name, args, kwargs) 

120 else: 

121 # Execute without retry 

122 result = f(*args, **kwargs) 

123 

124 # Log output if enabled 

125 if log_output: 

126 logger.debug(f"[{node_name}] Output: {result}") 

127 

128 logger.info(f"[{node_name}] ✓ Completed") 

129 return result 

130 

131 except Exception as e: 

132 _log_error_with_hint(node_name, e) 

133 raise 

134 

135 # Store metadata 

136 wrapper._is_railway_node = True # type: ignore[attr-defined] 

137 wrapper._node_name = node_name # type: ignore[attr-defined] 

138 wrapper._original_func = f # type: ignore[attr-defined] 

139 wrapper._is_async = False # type: ignore[attr-defined] 

140 

141 return wrapper 

142 

143 

144def _create_async_wrapper( 

145 f: Callable[P, T], 

146 node_name: str, 

147 retry: bool | Retry, 

148 log_input: bool, 

149 log_output: bool, 

150) -> Callable[P, T]: 

151 """Create wrapper for asynchronous function.""" 

152 

153 @wraps(f) 

154 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: 

155 # Log input if enabled 

156 if log_input: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 logger.debug(f"[{node_name}] Input: args={args}, kwargs={kwargs}") 

158 

159 logger.info(f"[{node_name}] Starting...") 

160 

161 # Determine retry configuration 

162 retry_config = _get_retry_configuration(retry, node_name) 

163 

164 try: 

165 if retry_config is not None: 

166 # Execute with retry 

167 result = await _execute_async_with_retry( 

168 f, retry_config, node_name, args, kwargs 

169 ) 

170 else: 

171 # Execute without retry 

172 result = await f(*args, **kwargs) 

173 

174 # Log output if enabled 

175 if log_output: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 logger.debug(f"[{node_name}] Output: {result}") 

177 

178 logger.info(f"[{node_name}] ✓ Completed") 

179 return result 

180 

181 except Exception as e: 

182 _log_error_with_hint(node_name, e) 

183 raise 

184 

185 # Store metadata 

186 wrapper._is_railway_node = True # type: ignore[attr-defined] 

187 wrapper._node_name = node_name # type: ignore[attr-defined] 

188 wrapper._original_func = f # type: ignore[attr-defined] 

189 wrapper._is_async = True # type: ignore[attr-defined] 

190 

191 return wrapper 

192 

193 

194async def _execute_async_with_retry( 

195 func: Callable[P, T], 

196 retry_config: Retry, 

197 node_name: str, 

198 args: tuple, 

199 kwargs: dict, 

200) -> T: 

201 """Execute async function with retry logic.""" 

202 max_attempts = retry_config.max_attempts 

203 attempt_count = 0 

204 

205 async for attempt in AsyncRetrying( 205 ↛ exitline 205 didn't return from function '_execute_async_with_retry' because the loop on line 205 didn't complete

206 stop=stop_after_attempt(retry_config.max_attempts), 

207 wait=wait_exponential( 

208 multiplier=retry_config.multiplier, 

209 min=retry_config.min_wait, 

210 max=retry_config.max_wait, 

211 ), 

212 reraise=True, 

213 ): 

214 with attempt: 

215 attempt_count = attempt.retry_state.attempt_number 

216 if attempt_count > 1: 

217 logger.warning( 

218 f"[{node_name}] リトライ中... (試行 {attempt_count}/{max_attempts})" 

219 ) 

220 return await func(*args, **kwargs) 

221 

222 

223def _get_retry_configuration(retry_param: bool | Retry, node_name: str) -> Retry | None: 

224 """Get retry configuration from parameter or settings.""" 

225 if retry_param is True: 

226 # Load from config 

227 from railway.core.config import get_retry_config 

228 config = get_retry_config(node_name) 

229 return Retry( 

230 max_attempts=config.max_attempts, 

231 min_wait=config.min_wait, 

232 max_wait=config.max_wait, 

233 exponential_base=config.multiplier, 

234 ) 

235 elif isinstance(retry_param, Retry): 

236 return retry_param 

237 else: 

238 return None 

239 

240 

241def _get_error_hint(exception: Exception) -> str | None: 

242 """Get hint message for common errors.""" 

243 if isinstance(exception, ConnectionError): 

244 return "ヒント: ネットワーク接続を確認してください。APIエンドポイントが正しいか確認してください。" 

245 elif isinstance(exception, TimeoutError): 

246 return "ヒント: タイムアウト値を増やすか、APIサーバーの状態を確認してください。" 

247 elif isinstance(exception, ValueError): 

248 return "ヒント: 入力データの形式や値を確認してください。" 

249 elif isinstance(exception, FileNotFoundError): 

250 return "ヒント: ファイルパスが正しいか確認してください。" 

251 elif isinstance(exception, PermissionError): 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 return "ヒント: ファイルやディレクトリの権限を確認してください。" 

253 elif isinstance(exception, KeyError): 

254 return "ヒント: 必要なキーが存在するか確認してください。設定ファイルを確認してください。" 

255 

256 # Check for API key related errors 

257 error_str = str(exception).upper() 

258 if "API_KEY" in error_str or "API_SECRET" in error_str or "UNAUTHORIZED" in error_str: 

259 return "ヒント: .envファイルでAPI認証情報が正しく設定されているか確認してください。" 

260 

261 return None 

262 

263 

264def _log_error_with_hint(node_name: str, exception: Exception) -> None: 

265 """Log error with hint and log file reference.""" 

266 logger.error(f"[{node_name}] ✗ Failed: {type(exception).__name__}: {exception}") 

267 logger.error("詳細は logs/app.log を確認してください") 

268 

269 hint = _get_error_hint(exception) 

270 if hint: 

271 logger.error(hint) 

272 

273 

274def _execute_with_retry( 

275 func: Callable[P, T], 

276 retry_config: Retry, 

277 node_name: str, 

278 args: tuple, 

279 kwargs: dict, 

280) -> T: 

281 """Execute function with retry logic.""" 

282 attempt_count = 0 

283 max_attempts = retry_config.max_attempts 

284 

285 def before_retry(retry_state): 

286 nonlocal attempt_count 

287 attempt_count = retry_state.attempt_number 

288 logger.warning(f"[{node_name}] リトライ中... (試行 {attempt_count}/{max_attempts})") 

289 

290 retry_decorator = tenacity_retry( 

291 stop=stop_after_attempt(retry_config.max_attempts), 

292 wait=wait_exponential( 

293 multiplier=retry_config.multiplier, 

294 min=retry_config.min_wait, 

295 max=retry_config.max_wait, 

296 ), 

297 reraise=True, 

298 before_sleep=before_retry, 

299 ) 

300 

301 retryable_func = retry_decorator(func) 

302 

303 try: 

304 return retryable_func(*args, **kwargs) 

305 except RetryError as e: 

306 # Extract original exception 

307 if e.last_attempt.exception() is not None: 

308 raise e.last_attempt.exception() from None 

309 raise 

310 

311 

312def entry_point( 

313 func: Callable[P, T] | None = None, 

314 *, 

315 handle_result: bool = True, 

316) -> Callable[P, Any] | Callable[[Callable[P, T]], Callable[P, Any]]: 

317 """ 

318 Entry point decorator that provides: 

319 1. Automatic CLI argument parsing via Typer 

320 2. Error handling and logging 

321 3. Exit code management (0 for success, 1 for failure) 

322 

323 Args: 

324 func: Function to decorate 

325 handle_result: Automatically handle Result types (default: True) 

326 

327 Returns: 

328 Decorated function with CLI integration 

329 

330 Example: 

331 @entry_point 

332 def main(name: str = "World", verbose: bool = False): 

333 print(f"Hello, {name}!") 

334 return "Success" 

335 

336 if __name__ == "__main__": 

337 main() # Typer app is invoked 

338 

339 CLI usage: 

340 python -m src.entry --name Alice --verbose 

341 """ 

342 

343 def decorator(f: Callable[P, T]) -> Callable[P, Any]: 

344 entry_name = f.__name__ 

345 

346 # Create Typer app for this entry point 

347 app = typer.Typer( 

348 help=f.__doc__ or f"Execute {entry_name} entry point", 

349 add_completion=False, 

350 ) 

351 

352 @app.command() 

353 @wraps(f) 

354 def cli_wrapper(**kwargs: Any) -> None: 

355 """CLI wrapper for the entry point.""" 

356 logger.info(f"[{entry_name}] Entry point started") 

357 logger.debug(f"[{entry_name}] Arguments: {kwargs}") 

358 

359 try: 

360 # Execute the main function 

361 _ = f(**kwargs) # type: ignore[call-arg] 

362 

363 # Log success 

364 logger.info(f"[{entry_name}] ✓ Completed successfully") 

365 

366 except KeyboardInterrupt: 

367 logger.warning(f"[{entry_name}] Interrupted by user") 

368 raise 

369 

370 except Exception as e: 

371 logger.exception(f"[{entry_name}] ✗ Unhandled exception: {e}") 

372 raise 

373 

374 # Create a wrapper that can be called directly or via Typer 

375 @wraps(f) 

376 def entry_wrapper(*args: P.args, **kwargs: P.kwargs) -> Any: 

377 """ 

378 Wrapper that delegates to Typer app when called without args, 

379 or to original function when called with args. 

380 """ 

381 if args or kwargs: 381 ↛ 386line 381 didn't jump to line 386 because the condition on line 381 was always true

382 # Called programmatically with arguments 

383 return f(*args, **kwargs) 

384 else: 

385 # Called as CLI entry point 

386 app() 

387 

388 # Store Typer app and metadata for programmatic access 

389 entry_wrapper._typer_app = app # type: ignore[attr-defined] 

390 entry_wrapper._original_func = f # type: ignore[attr-defined] 

391 entry_wrapper._is_railway_entry_point = True # type: ignore[attr-defined] 

392 entry_wrapper._handle_result = handle_result # type: ignore[attr-defined] 

393 entry_wrapper.__doc__ = f.__doc__ 

394 

395 return entry_wrapper 

396 

397 if func is None: 

398 return decorator 

399 return decorator(func)