Coverage for railway / core / decorators.py: 90%
158 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 00:06 +0900
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 00:06 +0900
1"""
2Decorators for Railway nodes and entry points.
3"""
5import asyncio
6from collections.abc import Callable
7from functools import wraps
8from typing import Any, ParamSpec, TypeVar
10import typer
11from loguru import logger
12from tenacity import (
13 retry as tenacity_retry,
14 stop_after_attempt,
15 wait_exponential,
16 RetryError,
17 before_sleep_log,
18 AsyncRetrying,
19)
21P = ParamSpec("P")
22T = TypeVar("T")
25class Retry:
26 """Retry configuration for nodes."""
28 def __init__(
29 self,
30 max_attempts: int = 3,
31 min_wait: float = 2.0,
32 max_wait: float = 10.0,
33 exponential_base: int = 2,
34 ):
35 self.max_attempts = max_attempts
36 self.min_wait = min_wait
37 self.max_wait = max_wait
38 self.exponential_base = exponential_base
39 self.multiplier = exponential_base # Alias for compatibility
42def node(
43 func: Callable[P, T] | None = None,
44 *,
45 retry: bool | Retry = False,
46 log_input: bool = False,
47 log_output: bool = False,
48 name: str | None = None,
49) -> Callable[P, T] | Callable[[Callable[P, T]], Callable[P, T]]:
50 """
51 Node decorator that provides:
52 1. Automatic exception handling with logging
53 2. Optional retry with exponential backoff
54 3. Structured logging
55 4. Metadata storage
57 Args:
58 func: Function to decorate
59 retry: Enable retry (bool) or provide Retry config
60 log_input: Log input parameters (caution: may log sensitive data)
61 log_output: Log output data (caution: may log sensitive data)
62 name: Override node name (default: function name)
64 Returns:
65 Decorated function with automatic error handling
67 Example:
68 @node
69 def fetch_data() -> dict:
70 return api.get("/data")
72 @node(retry=True)
73 def fetch_with_retry() -> dict:
74 return api.get("/data")
76 @node(retry=Retry(max_attempts=5, min_wait=1))
77 def custom_retry() -> dict:
78 return api.get("/data")
79 """
81 def decorator(f: Callable[P, T]) -> Callable[P, T]:
82 node_name = name or f.__name__
83 is_async = asyncio.iscoroutinefunction(f)
85 if is_async:
86 return _create_async_wrapper(f, node_name, retry, log_input, log_output)
87 else:
88 return _create_sync_wrapper(f, node_name, retry, log_input, log_output)
90 # Handle decorator usage with and without parentheses
91 if func is None:
92 return decorator
93 return decorator(func)
96def _create_sync_wrapper(
97 f: Callable[P, T],
98 node_name: str,
99 retry: bool | Retry,
100 log_input: bool,
101 log_output: bool,
102) -> Callable[P, T]:
103 """Create wrapper for synchronous function."""
105 @wraps(f)
106 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
107 # Log input if enabled
108 if log_input:
109 logger.debug(f"[{node_name}] Input: args={args}, kwargs={kwargs}")
111 logger.info(f"[{node_name}] Starting...")
113 # Determine retry configuration
114 retry_config = _get_retry_configuration(retry, node_name)
116 try:
117 if retry_config is not None:
118 # Execute with retry
119 result = _execute_with_retry(f, retry_config, node_name, args, kwargs)
120 else:
121 # Execute without retry
122 result = f(*args, **kwargs)
124 # Log output if enabled
125 if log_output:
126 logger.debug(f"[{node_name}] Output: {result}")
128 logger.info(f"[{node_name}] ✓ Completed")
129 return result
131 except Exception as e:
132 _log_error_with_hint(node_name, e)
133 raise
135 # Store metadata
136 wrapper._is_railway_node = True # type: ignore[attr-defined]
137 wrapper._node_name = node_name # type: ignore[attr-defined]
138 wrapper._original_func = f # type: ignore[attr-defined]
139 wrapper._is_async = False # type: ignore[attr-defined]
141 return wrapper
144def _create_async_wrapper(
145 f: Callable[P, T],
146 node_name: str,
147 retry: bool | Retry,
148 log_input: bool,
149 log_output: bool,
150) -> Callable[P, T]:
151 """Create wrapper for asynchronous function."""
153 @wraps(f)
154 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
155 # Log input if enabled
156 if log_input: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 logger.debug(f"[{node_name}] Input: args={args}, kwargs={kwargs}")
159 logger.info(f"[{node_name}] Starting...")
161 # Determine retry configuration
162 retry_config = _get_retry_configuration(retry, node_name)
164 try:
165 if retry_config is not None:
166 # Execute with retry
167 result = await _execute_async_with_retry(
168 f, retry_config, node_name, args, kwargs
169 )
170 else:
171 # Execute without retry
172 result = await f(*args, **kwargs)
174 # Log output if enabled
175 if log_output: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 logger.debug(f"[{node_name}] Output: {result}")
178 logger.info(f"[{node_name}] ✓ Completed")
179 return result
181 except Exception as e:
182 _log_error_with_hint(node_name, e)
183 raise
185 # Store metadata
186 wrapper._is_railway_node = True # type: ignore[attr-defined]
187 wrapper._node_name = node_name # type: ignore[attr-defined]
188 wrapper._original_func = f # type: ignore[attr-defined]
189 wrapper._is_async = True # type: ignore[attr-defined]
191 return wrapper
194async def _execute_async_with_retry(
195 func: Callable[P, T],
196 retry_config: Retry,
197 node_name: str,
198 args: tuple,
199 kwargs: dict,
200) -> T:
201 """Execute async function with retry logic."""
202 max_attempts = retry_config.max_attempts
203 attempt_count = 0
205 async for attempt in AsyncRetrying( 205 ↛ exitline 205 didn't return from function '_execute_async_with_retry' because the loop on line 205 didn't complete
206 stop=stop_after_attempt(retry_config.max_attempts),
207 wait=wait_exponential(
208 multiplier=retry_config.multiplier,
209 min=retry_config.min_wait,
210 max=retry_config.max_wait,
211 ),
212 reraise=True,
213 ):
214 with attempt:
215 attempt_count = attempt.retry_state.attempt_number
216 if attempt_count > 1:
217 logger.warning(
218 f"[{node_name}] リトライ中... (試行 {attempt_count}/{max_attempts})"
219 )
220 return await func(*args, **kwargs)
223def _get_retry_configuration(retry_param: bool | Retry, node_name: str) -> Retry | None:
224 """Get retry configuration from parameter or settings."""
225 if retry_param is True:
226 # Load from config
227 from railway.core.config import get_retry_config
228 config = get_retry_config(node_name)
229 return Retry(
230 max_attempts=config.max_attempts,
231 min_wait=config.min_wait,
232 max_wait=config.max_wait,
233 exponential_base=config.multiplier,
234 )
235 elif isinstance(retry_param, Retry):
236 return retry_param
237 else:
238 return None
241def _get_error_hint(exception: Exception) -> str | None:
242 """Get hint message for common errors."""
243 if isinstance(exception, ConnectionError):
244 return "ヒント: ネットワーク接続を確認してください。APIエンドポイントが正しいか確認してください。"
245 elif isinstance(exception, TimeoutError):
246 return "ヒント: タイムアウト値を増やすか、APIサーバーの状態を確認してください。"
247 elif isinstance(exception, ValueError):
248 return "ヒント: 入力データの形式や値を確認してください。"
249 elif isinstance(exception, FileNotFoundError):
250 return "ヒント: ファイルパスが正しいか確認してください。"
251 elif isinstance(exception, PermissionError): 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 return "ヒント: ファイルやディレクトリの権限を確認してください。"
253 elif isinstance(exception, KeyError):
254 return "ヒント: 必要なキーが存在するか確認してください。設定ファイルを確認してください。"
256 # Check for API key related errors
257 error_str = str(exception).upper()
258 if "API_KEY" in error_str or "API_SECRET" in error_str or "UNAUTHORIZED" in error_str:
259 return "ヒント: .envファイルでAPI認証情報が正しく設定されているか確認してください。"
261 return None
264def _log_error_with_hint(node_name: str, exception: Exception) -> None:
265 """Log error with hint and log file reference."""
266 logger.error(f"[{node_name}] ✗ Failed: {type(exception).__name__}: {exception}")
267 logger.error("詳細は logs/app.log を確認してください")
269 hint = _get_error_hint(exception)
270 if hint:
271 logger.error(hint)
274def _execute_with_retry(
275 func: Callable[P, T],
276 retry_config: Retry,
277 node_name: str,
278 args: tuple,
279 kwargs: dict,
280) -> T:
281 """Execute function with retry logic."""
282 attempt_count = 0
283 max_attempts = retry_config.max_attempts
285 def before_retry(retry_state):
286 nonlocal attempt_count
287 attempt_count = retry_state.attempt_number
288 logger.warning(f"[{node_name}] リトライ中... (試行 {attempt_count}/{max_attempts})")
290 retry_decorator = tenacity_retry(
291 stop=stop_after_attempt(retry_config.max_attempts),
292 wait=wait_exponential(
293 multiplier=retry_config.multiplier,
294 min=retry_config.min_wait,
295 max=retry_config.max_wait,
296 ),
297 reraise=True,
298 before_sleep=before_retry,
299 )
301 retryable_func = retry_decorator(func)
303 try:
304 return retryable_func(*args, **kwargs)
305 except RetryError as e:
306 # Extract original exception
307 if e.last_attempt.exception() is not None:
308 raise e.last_attempt.exception() from None
309 raise
312def entry_point(
313 func: Callable[P, T] | None = None,
314 *,
315 handle_result: bool = True,
316) -> Callable[P, Any] | Callable[[Callable[P, T]], Callable[P, Any]]:
317 """
318 Entry point decorator that provides:
319 1. Automatic CLI argument parsing via Typer
320 2. Error handling and logging
321 3. Exit code management (0 for success, 1 for failure)
323 Args:
324 func: Function to decorate
325 handle_result: Automatically handle Result types (default: True)
327 Returns:
328 Decorated function with CLI integration
330 Example:
331 @entry_point
332 def main(name: str = "World", verbose: bool = False):
333 print(f"Hello, {name}!")
334 return "Success"
336 if __name__ == "__main__":
337 main() # Typer app is invoked
339 CLI usage:
340 python -m src.entry --name Alice --verbose
341 """
343 def decorator(f: Callable[P, T]) -> Callable[P, Any]:
344 entry_name = f.__name__
346 # Create Typer app for this entry point
347 app = typer.Typer(
348 help=f.__doc__ or f"Execute {entry_name} entry point",
349 add_completion=False,
350 )
352 @app.command()
353 @wraps(f)
354 def cli_wrapper(**kwargs: Any) -> None:
355 """CLI wrapper for the entry point."""
356 logger.info(f"[{entry_name}] Entry point started")
357 logger.debug(f"[{entry_name}] Arguments: {kwargs}")
359 try:
360 # Execute the main function
361 _ = f(**kwargs) # type: ignore[call-arg]
363 # Log success
364 logger.info(f"[{entry_name}] ✓ Completed successfully")
366 except KeyboardInterrupt:
367 logger.warning(f"[{entry_name}] Interrupted by user")
368 raise
370 except Exception as e:
371 logger.exception(f"[{entry_name}] ✗ Unhandled exception: {e}")
372 raise
374 # Create a wrapper that can be called directly or via Typer
375 @wraps(f)
376 def entry_wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
377 """
378 Wrapper that delegates to Typer app when called without args,
379 or to original function when called with args.
380 """
381 if args or kwargs: 381 ↛ 386line 381 didn't jump to line 386 because the condition on line 381 was always true
382 # Called programmatically with arguments
383 return f(*args, **kwargs)
384 else:
385 # Called as CLI entry point
386 app()
388 # Store Typer app and metadata for programmatic access
389 entry_wrapper._typer_app = app # type: ignore[attr-defined]
390 entry_wrapper._original_func = f # type: ignore[attr-defined]
391 entry_wrapper._is_railway_entry_point = True # type: ignore[attr-defined]
392 entry_wrapper._handle_result = handle_result # type: ignore[attr-defined]
393 entry_wrapper.__doc__ = f.__doc__
395 return entry_wrapper
397 if func is None:
398 return decorator
399 return decorator(func)