Coverage for fastblocks / initializers.py: 19%

117 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:58 -0800

1"""Application initialization components for FastBlocks. 

2 

3This module contains classes responsible for initializing various aspects 

4of a FastBlocks application, separating concerns from the main application class. 

5""" 

6 

7import logging 

8import typing as t 

9 

10from acb import register_pkg 

11 

12try: 

13 from acb.adapters import get_installed_adapter 

14except ImportError: # acb >= 0.19 removed this helper 

15 from acb.adapters import get_adapter, get_installed_adapters 

16 

17 def get_installed_adapter(adapter_name: str) -> str | None: 

18 """Compatibility shim that resolves an installed adapter name.""" 

19 for adapter in get_installed_adapters(): 

20 meta = getattr(adapter, "metadata", None) 

21 provider = getattr(meta, "provider", None) 

22 if adapter_name in (adapter.category, adapter.name, provider): 

23 provider_str = str(provider) if provider is not None else None 

24 adapter_name_str = ( 

25 str(adapter.name) if hasattr(adapter, "name") else None 

26 ) 

27 return provider_str or adapter_name_str 

28 adapter = get_adapter(adapter_name) 

29 if adapter: 

30 meta = getattr(adapter, "metadata", None) 

31 provider = getattr(meta, "provider", None) 

32 provider_str = str(provider) if provider is not None else None 

33 adapter_name_str = str(adapter.name) if hasattr(adapter, "name") else None 

34 return provider_str or adapter_name_str 

35 return None 

36 

37 

38from acb.config import AdapterBase, Config 

39from acb.depends import depends 

40from starception import install_error_handler 

41from starlette.applications import Starlette 

42 

43 

44class ApplicationInitializer: 

45 def __init__(self, app: Starlette, **kwargs: t.Any) -> None: 

46 self.app = app 

47 self.kwargs = kwargs 

48 self.config: t.Any | None = None 

49 self.logger: t.Any | None = None 

50 self.depends: t.Any | None = None 

51 self._acb_modules: tuple[t.Any, ...] = () 

52 

53 def initialize(self) -> None: 

54 self._load_acb_modules() 

55 self._setup_dependencies() 

56 self._configure_error_handling() 

57 self._configure_debug_mode() 

58 self._initialize_starlette() 

59 self._configure_exception_handlers() 

60 self._setup_models() 

61 self._configure_logging() 

62 self._register_event_handlers() 

63 

64 def _load_acb_modules(self) -> None: 

65 try: 

66 logger = depends.get_sync("logger") 

67 logger_class: type[t.Any] | None = ( 

68 logger.__class__ if logger is not None else None 

69 ) 

70 from acb.logger import InterceptHandler 

71 

72 interceptor_class: type[t.Any] | None = InterceptHandler 

73 except Exception: 

74 logger_class = None 

75 interceptor_class = None 

76 self._acb_modules = ( 

77 register_pkg, 

78 get_installed_adapter, 

79 Config, 

80 AdapterBase, 

81 interceptor_class, 

82 logger_class, 

83 depends, 

84 ) 

85 

86 def _setup_dependencies(self) -> None: 

87 self.config = self.kwargs.get("config") 

88 if self.config is None: 

89 try: 

90 self.config = depends.get_sync("config") 

91 except Exception: 

92 self.config = None 

93 

94 self.logger = self.kwargs.get("logger") 

95 if self.logger is None: 

96 try: 

97 self.logger = depends.get_sync("logger") 

98 except Exception: 

99 self.logger = None 

100 

101 self.depends = depends 

102 

103 def _configure_error_handling(self) -> None: 

104 if not getattr(self.config, "deployed", False) or not getattr( 

105 getattr(self.config, "debug", None), 

106 "production", 

107 False, 

108 ): 

109 install_error_handler() 

110 

111 def _configure_debug_mode(self) -> None: 

112 debug_config = getattr(self.config, "debug", None) 

113 self.app.debug = ( 

114 getattr(debug_config, "fastblocks", False) if debug_config else False 

115 ) 

116 if self.logger: 

117 self.logger.warning(f"Fastblocks debug: {self.app.debug}") 

118 

119 def _initialize_starlette(self) -> None: 

120 Starlette.__init__( 

121 self.app, 

122 debug=self.app.debug, 

123 routes=[], 

124 middleware=self.kwargs.get("middleware") 

125 if self.kwargs.get("middleware") is not None 

126 else [], 

127 lifespan=self.kwargs.get("lifespan"), 

128 exception_handlers=self.kwargs.get("exception_handlers") 

129 if self.kwargs.get("exception_handlers") is not None 

130 else {}, 

131 ) 

132 middleware = self.kwargs.get("middleware") 

133 self.app.user_middleware = list(middleware) if middleware is not None else [] 

134 

135 def _configure_exception_handlers(self) -> None: 

136 from .exceptions import handle_exception 

137 

138 exception_handlers = self.kwargs.get("exception_handlers") 

139 if exception_handlers is None: 

140 exception_handlers = { 

141 404: handle_exception, 

142 500: handle_exception, 

143 } 

144 object.__setattr__(self.app, "exception_handlers", exception_handlers) 

145 

146 def _setup_models(self) -> None: 

147 try: 

148 models = self.depends.get_sync("models") # type: ignore[union-attr] 

149 except Exception: 

150 models = None 

151 object.__setattr__(self.app, "models", models) 

152 

153 def _configure_logging(self) -> None: 

154 if get_installed_adapter("logfire"): 

155 from logfire import instrument_starlette # type: ignore[import-untyped] 

156 

157 instrument_starlette(self.app) 

158 interceptor_class = self._acb_modules[4] 

159 if interceptor_class: 

160 for logger_name in ( 

161 "uvicorn", 

162 "uvicorn.access", 

163 "granian", 

164 "granian.access", 

165 ): 

166 server_logger = logging.getLogger(logger_name) 

167 server_logger.handlers.clear() 

168 server_logger.addHandler(interceptor_class()) 

169 server_logger.setLevel(logging.DEBUG) 

170 server_logger.propagate = False 

171 

172 def _register_integrations_async(self) -> None: 

173 """Run async integration registration in background.""" 

174 from contextlib import suppress 

175 

176 from ._events_integration import register_fastblocks_event_handlers 

177 from ._health_integration import register_fastblocks_health_checks 

178 from ._validation_integration import register_fastblocks_validation 

179 from ._workflows_integration import register_fastblocks_workflows 

180 

181 with suppress(Exception): 

182 import asyncio 

183 

184 async def register_all() -> None: 

185 """Register all FastBlocks integrations concurrently.""" 

186 try: 

187 await asyncio.gather( 

188 register_fastblocks_event_handlers(), 

189 register_fastblocks_health_checks(), 

190 register_fastblocks_validation(), 

191 register_fastblocks_workflows(), 

192 return_exceptions=True, 

193 ) 

194 if self.logger: 

195 self.logger.info("All FastBlocks integrations registered") 

196 except Exception as e: 

197 if self.logger: 

198 self.logger.warning( 

199 f"Some integrations failed to register: {e}" 

200 ) 

201 

202 # Use running loop when available, otherwise start a new one 

203 with suppress(Exception): # Graceful degradation 

204 try: 

205 loop = asyncio.get_running_loop() 

206 loop.create_task(register_all()) 

207 except RuntimeError: 

208 asyncio.run(register_all()) 

209 

210 def _register_event_handlers(self) -> None: 

211 """Register FastBlocks event handlers, health checks, validation, and workflows with ACB.""" 

212 try: 

213 self._register_integrations_async() 

214 

215 if self.logger: 

216 self.logger.info( 

217 "FastBlocks integrations registered (events, health, validation, workflows)" 

218 ) 

219 

220 except ImportError: 

221 # ACB integrations not available - graceful degradation 

222 if self.logger: 

223 self.logger.debug( 

224 "ACB integrations not available - running without enhanced features" 

225 ) 

226 except Exception as e: 

227 # Log error but don't fail application startup 

228 if self.logger: 

229 self.logger.warning( 

230 f"Failed to register ACB integrations: {e} - continuing with degraded features" 

231 )