Coverage for fastblocks / initializers.py: 19%
117 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
1"""Application initialization components for FastBlocks.
3This module contains classes responsible for initializing various aspects
4of a FastBlocks application, separating concerns from the main application class.
5"""
7import logging
8import typing as t
10from acb import register_pkg
12try:
13 from acb.adapters import get_installed_adapter
14except ImportError: # acb >= 0.19 removed this helper
15 from acb.adapters import get_adapter, get_installed_adapters
17 def get_installed_adapter(adapter_name: str) -> str | None:
18 """Compatibility shim that resolves an installed adapter name."""
19 for adapter in get_installed_adapters():
20 meta = getattr(adapter, "metadata", None)
21 provider = getattr(meta, "provider", None)
22 if adapter_name in (adapter.category, adapter.name, provider):
23 provider_str = str(provider) if provider is not None else None
24 adapter_name_str = (
25 str(adapter.name) if hasattr(adapter, "name") else None
26 )
27 return provider_str or adapter_name_str
28 adapter = get_adapter(adapter_name)
29 if adapter:
30 meta = getattr(adapter, "metadata", None)
31 provider = getattr(meta, "provider", None)
32 provider_str = str(provider) if provider is not None else None
33 adapter_name_str = str(adapter.name) if hasattr(adapter, "name") else None
34 return provider_str or adapter_name_str
35 return None
38from acb.config import AdapterBase, Config
39from acb.depends import depends
40from starception import install_error_handler
41from starlette.applications import Starlette
44class ApplicationInitializer:
45 def __init__(self, app: Starlette, **kwargs: t.Any) -> None:
46 self.app = app
47 self.kwargs = kwargs
48 self.config: t.Any | None = None
49 self.logger: t.Any | None = None
50 self.depends: t.Any | None = None
51 self._acb_modules: tuple[t.Any, ...] = ()
53 def initialize(self) -> None:
54 self._load_acb_modules()
55 self._setup_dependencies()
56 self._configure_error_handling()
57 self._configure_debug_mode()
58 self._initialize_starlette()
59 self._configure_exception_handlers()
60 self._setup_models()
61 self._configure_logging()
62 self._register_event_handlers()
64 def _load_acb_modules(self) -> None:
65 try:
66 logger = depends.get_sync("logger")
67 logger_class: type[t.Any] | None = (
68 logger.__class__ if logger is not None else None
69 )
70 from acb.logger import InterceptHandler
72 interceptor_class: type[t.Any] | None = InterceptHandler
73 except Exception:
74 logger_class = None
75 interceptor_class = None
76 self._acb_modules = (
77 register_pkg,
78 get_installed_adapter,
79 Config,
80 AdapterBase,
81 interceptor_class,
82 logger_class,
83 depends,
84 )
86 def _setup_dependencies(self) -> None:
87 self.config = self.kwargs.get("config")
88 if self.config is None:
89 try:
90 self.config = depends.get_sync("config")
91 except Exception:
92 self.config = None
94 self.logger = self.kwargs.get("logger")
95 if self.logger is None:
96 try:
97 self.logger = depends.get_sync("logger")
98 except Exception:
99 self.logger = None
101 self.depends = depends
103 def _configure_error_handling(self) -> None:
104 if not getattr(self.config, "deployed", False) or not getattr(
105 getattr(self.config, "debug", None),
106 "production",
107 False,
108 ):
109 install_error_handler()
111 def _configure_debug_mode(self) -> None:
112 debug_config = getattr(self.config, "debug", None)
113 self.app.debug = (
114 getattr(debug_config, "fastblocks", False) if debug_config else False
115 )
116 if self.logger:
117 self.logger.warning(f"Fastblocks debug: {self.app.debug}")
119 def _initialize_starlette(self) -> None:
120 Starlette.__init__(
121 self.app,
122 debug=self.app.debug,
123 routes=[],
124 middleware=self.kwargs.get("middleware")
125 if self.kwargs.get("middleware") is not None
126 else [],
127 lifespan=self.kwargs.get("lifespan"),
128 exception_handlers=self.kwargs.get("exception_handlers")
129 if self.kwargs.get("exception_handlers") is not None
130 else {},
131 )
132 middleware = self.kwargs.get("middleware")
133 self.app.user_middleware = list(middleware) if middleware is not None else []
135 def _configure_exception_handlers(self) -> None:
136 from .exceptions import handle_exception
138 exception_handlers = self.kwargs.get("exception_handlers")
139 if exception_handlers is None:
140 exception_handlers = {
141 404: handle_exception,
142 500: handle_exception,
143 }
144 object.__setattr__(self.app, "exception_handlers", exception_handlers)
146 def _setup_models(self) -> None:
147 try:
148 models = self.depends.get_sync("models") # type: ignore[union-attr]
149 except Exception:
150 models = None
151 object.__setattr__(self.app, "models", models)
153 def _configure_logging(self) -> None:
154 if get_installed_adapter("logfire"):
155 from logfire import instrument_starlette # type: ignore[import-untyped]
157 instrument_starlette(self.app)
158 interceptor_class = self._acb_modules[4]
159 if interceptor_class:
160 for logger_name in (
161 "uvicorn",
162 "uvicorn.access",
163 "granian",
164 "granian.access",
165 ):
166 server_logger = logging.getLogger(logger_name)
167 server_logger.handlers.clear()
168 server_logger.addHandler(interceptor_class())
169 server_logger.setLevel(logging.DEBUG)
170 server_logger.propagate = False
172 def _register_integrations_async(self) -> None:
173 """Run async integration registration in background."""
174 from contextlib import suppress
176 from ._events_integration import register_fastblocks_event_handlers
177 from ._health_integration import register_fastblocks_health_checks
178 from ._validation_integration import register_fastblocks_validation
179 from ._workflows_integration import register_fastblocks_workflows
181 with suppress(Exception):
182 import asyncio
184 async def register_all() -> None:
185 """Register all FastBlocks integrations concurrently."""
186 try:
187 await asyncio.gather(
188 register_fastblocks_event_handlers(),
189 register_fastblocks_health_checks(),
190 register_fastblocks_validation(),
191 register_fastblocks_workflows(),
192 return_exceptions=True,
193 )
194 if self.logger:
195 self.logger.info("All FastBlocks integrations registered")
196 except Exception as e:
197 if self.logger:
198 self.logger.warning(
199 f"Some integrations failed to register: {e}"
200 )
202 # Use running loop when available, otherwise start a new one
203 with suppress(Exception): # Graceful degradation
204 try:
205 loop = asyncio.get_running_loop()
206 loop.create_task(register_all())
207 except RuntimeError:
208 asyncio.run(register_all())
210 def _register_event_handlers(self) -> None:
211 """Register FastBlocks event handlers, health checks, validation, and workflows with ACB."""
212 try:
213 self._register_integrations_async()
215 if self.logger:
216 self.logger.info(
217 "FastBlocks integrations registered (events, health, validation, workflows)"
218 )
220 except ImportError:
221 # ACB integrations not available - graceful degradation
222 if self.logger:
223 self.logger.debug(
224 "ACB integrations not available - running without enhanced features"
225 )
226 except Exception as e:
227 # Log error but don't fail application startup
228 if self.logger:
229 self.logger.warning(
230 f"Failed to register ACB integrations: {e} - continuing with degraded features"
231 )