Coverage for fastblocks / _health_integration.py: 0%

202 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:30 -0800

1"""ACB HealthService integration for FastBlocks. 

2 

3This module bridges FastBlocks components with ACB's comprehensive health monitoring system. 

4It registers FastBlocks-specific health checks while maintaining existing MCP health checks. 

5 

6Author: lesleslie <les@wedgwoodwebworks.com> 

7Created: 2025-10-01 

8""" 

9 

10import typing as t 

11from contextlib import suppress 

12from uuid import UUID 

13 

14from acb.adapters import AdapterStatus 

15from acb.depends import Inject, depends 

16 

17# Optional ACB health imports (graceful degradation if not available) 

18try: 

19 from acb.services.health import ( 

20 HealthCheckMixin, 

21 HealthCheckResult, 

22 HealthStatus, 

23 ) 

24 

25 acb_health_available = True 

26except ImportError: 

27 acb_health_available = False 

28 HealthCheckMixin = object # Fallback base class 

29 HealthCheckResult = None 

30 HealthCheckType = None 

31 HealthStatus = None 

32 

33 

34class FastBlocksHealthCheck(HealthCheckMixin): # type: ignore[misc] 

35 """Base health check implementation for FastBlocks components.""" 

36 

37 @depends.inject # type: ignore[misc] # ACB untyped decorator 

38 def __init__( 

39 self, 

40 config: Inject[t.Any], 

41 component_id: str | None = None, 

42 component_name: str | None = None, 

43 ) -> None: 

44 if acb_health_available: 

45 super().__init__() 

46 self.config = config 

47 self._component_id: str = component_id or self.__class__.__name__.lower() 

48 self._component_name: str = component_name or self.__class__.__name__ 

49 

50 @property 

51 def component_id(self) -> str: 

52 """Get unique identifier for this component.""" 

53 return self._component_id 

54 

55 @property 

56 def component_name(self) -> str: 

57 """Get human-readable name for this component.""" 

58 return self._component_name 

59 

60 async def _perform_health_check( 

61 self, 

62 check_type: t.Any, # HealthCheckType when available 

63 ) -> t.Any: # HealthCheckResult when available 

64 """Default health check - override in subclasses.""" 

65 if not acb_health_available: 

66 return None 

67 

68 return HealthCheckResult( 

69 component_id=self.component_id, 

70 component_name=self.component_name, 

71 status=HealthStatus.HEALTHY, 

72 check_type=check_type, 

73 message=f"{self.component_name} is operational", 

74 ) 

75 

76 

77class TemplatesHealthCheck(FastBlocksHealthCheck): 

78 """Health check for FastBlocks template system.""" 

79 

80 def __init__(self) -> None: 

81 super().__init__( 

82 component_id="templates", 

83 component_name="Template System", 

84 ) 

85 

86 async def _perform_health_check( 

87 self, 

88 check_type: t.Any, 

89 ) -> t.Any: 

90 """Check template system health.""" 

91 if not acb_health_available: 

92 return None 

93 

94 details: dict[str, t.Any] = {} 

95 status = HealthStatus.HEALTHY 

96 message = "Template system operational" 

97 

98 try: 

99 # Try to get templates adapter 

100 templates = await depends.get("templates") 

101 

102 if templates is None: 

103 status = HealthStatus.DEGRADED 

104 message = "Templates adapter not initialized" 

105 else: 

106 # Check if templates has required attributes 

107 if hasattr(templates, "app") and templates.app is not None: 

108 details["jinja_env_initialized"] = True 

109 

110 # Check template directory accessibility 

111 if hasattr(templates.app, "env") and templates.app.env.loader: 

112 details["loader_available"] = True 

113 else: 

114 status = HealthStatus.DEGRADED 

115 message = "Template loader not configured" 

116 else: 

117 status = HealthStatus.DEGRADED 

118 message = "Template app not initialized" 

119 

120 # Check cache availability 

121 try: 

122 cache = await depends.get("cache") 

123 details["cache_available"] = cache is not None 

124 except Exception: 

125 details["cache_available"] = False 

126 

127 except Exception as e: 

128 status = HealthStatus.UNHEALTHY 

129 message = f"Template health check failed: {e}" 

130 details["error"] = str(e) 

131 

132 return HealthCheckResult( 

133 component_id=self.component_id, 

134 component_name=self.component_name, 

135 status=status, 

136 check_type=check_type, 

137 message=message, 

138 details=details, 

139 ) 

140 

141 

142class CacheHealthCheck(FastBlocksHealthCheck): 

143 """Health check for FastBlocks cache system.""" 

144 

145 def __init__(self) -> None: 

146 super().__init__( 

147 component_id="cache", 

148 component_name="Cache System", 

149 ) 

150 

151 async def _test_cache_operations( 

152 self, cache: t.Any, details: dict[str, t.Any] 

153 ) -> tuple[t.Any, str]: # Returns (status, message) 

154 """Test cache read/write operations and update details.""" 

155 test_key = "__fastblocks_health_check__" 

156 test_value = "health_check_ok" 

157 

158 try: 

159 # Test set operation 

160 await cache.set(test_key, test_value, ttl=10) 

161 details["write_test"] = "passed" 

162 

163 # Test get operation 

164 retrieved = await cache.get(test_key) 

165 if retrieved == test_value: 

166 details["read_test"] = "passed" 

167 await cache.delete(test_key) 

168 return HealthStatus.HEALTHY, "Cache system operational" 

169 

170 details["read_test"] = "failed" 

171 return HealthStatus.DEGRADED, "Cache read verification failed" 

172 

173 except Exception as e: 

174 details["operation_error"] = str(e) 

175 return HealthStatus.DEGRADED, f"Cache operations failed: {e}" 

176 

177 async def _collect_cache_stats( 

178 self, cache: t.Any, details: dict[str, t.Any] 

179 ) -> None: 

180 """Collect cache statistics if available.""" 

181 if hasattr(cache, "get_stats"): 

182 with suppress(Exception): # Stats not critical 

183 stats = await cache.get_stats() 

184 if hasattr(stats, "hit_ratio"): 

185 details["cache_hit_ratio"] = stats.hit_ratio 

186 

187 async def _perform_health_check( 

188 self, 

189 check_type: t.Any, 

190 ) -> t.Any: 

191 """Check cache system health.""" 

192 if not acb_health_available: 

193 return None 

194 

195 details: dict[str, t.Any] = {} 

196 status = HealthStatus.HEALTHY 

197 message = "Cache system operational" 

198 

199 try: 

200 # Try to get cache adapter 

201 cache = await depends.get("cache") 

202 

203 if cache is None: 

204 status = HealthStatus.DEGRADED 

205 message = "Cache adapter not available (degraded mode)" 

206 details["reason"] = "cache_disabled_or_not_configured" 

207 else: 

208 # Test cache operations 

209 status, message = await self._test_cache_operations(cache, details) 

210 

211 # Collect stats if available 

212 await self._collect_cache_stats(cache, details) 

213 

214 except Exception as e: 

215 status = HealthStatus.UNHEALTHY 

216 message = f"Cache health check failed: {e}" 

217 details["error"] = str(e) 

218 

219 return HealthCheckResult( 

220 component_id=self.component_id, 

221 component_name=self.component_name, 

222 status=status, 

223 check_type=check_type, 

224 message=message, 

225 details=details, 

226 ) 

227 

228 

229class RoutesHealthCheck(FastBlocksHealthCheck): 

230 """Health check for FastBlocks routing system.""" 

231 

232 def __init__(self) -> None: 

233 super().__init__( 

234 component_id="routes", 

235 component_name="Routing System", 

236 ) 

237 

238 def _check_routes_adapter( 

239 self, routes: t.Any, details: dict[str, t.Any] 

240 ) -> tuple[t.Any, str]: # Returns (status, message) 

241 """Check routes adapter status and update details.""" 

242 if not hasattr(routes, "routes"): 

243 return HealthStatus.DEGRADED, "Routes collection not available" 

244 

245 route_count = len(routes.routes) if routes.routes else 0 

246 details["route_count"] = route_count 

247 

248 if route_count == 0: 

249 return HealthStatus.DEGRADED, "No routes registered" 

250 

251 return HealthStatus.HEALTHY, f"{route_count} routes registered" 

252 

253 async def _perform_health_check( 

254 self, 

255 check_type: t.Any, 

256 ) -> t.Any: 

257 """Check routing system health.""" 

258 if not acb_health_available: 

259 return None 

260 

261 details: dict[str, t.Any] = {} 

262 status = HealthStatus.HEALTHY 

263 message = "Routing system operational" 

264 

265 try: 

266 # Try to get routes adapter 

267 routes = await depends.get("routes") 

268 

269 if routes is None: 

270 status = HealthStatus.DEGRADED 

271 message = "Routes adapter not initialized" 

272 else: 

273 status, message = self._check_routes_adapter(routes, details) 

274 

275 except Exception as e: 

276 status = HealthStatus.UNHEALTHY 

277 message = f"Routes health check failed: {e}" 

278 details["error"] = str(e) 

279 

280 return HealthCheckResult( 

281 component_id=self.component_id, 

282 component_name=self.component_name, 

283 status=status, 

284 check_type=check_type, 

285 message=message, 

286 details=details, 

287 ) 

288 

289 

290class DatabaseHealthCheck(FastBlocksHealthCheck): 

291 """Health check for database connectivity.""" 

292 

293 def __init__(self) -> None: 

294 super().__init__( 

295 component_id="database", 

296 component_name="Database", 

297 ) 

298 

299 async def _perform_health_check( 

300 self, 

301 check_type: t.Any, 

302 ) -> t.Any: 

303 """Check database health.""" 

304 if not acb_health_available: 

305 return None 

306 

307 details: dict[str, t.Any] = {} 

308 status = HealthStatus.HEALTHY 

309 message = "Database operational" 

310 

311 try: 

312 # Try to get sql adapter 

313 sql = await depends.get("sql") 

314 

315 if sql is None: 

316 status = HealthStatus.DEGRADED 

317 message = "Database adapter not configured" 

318 details["reason"] = "sql_adapter_not_available" 

319 else: 

320 # Try a simple database query 

321 try: 

322 # Most databases support SELECT 1 as a ping query 

323 await sql.execute("SELECT 1") 

324 details["connectivity_test"] = "passed" 

325 

326 # Check if we have connection pool info 

327 if hasattr(sql, "get_connection_info"): 

328 with suppress(Exception): # Connection info not critical 

329 conn_info = await sql.get_connection_info() 

330 details["connection_info"] = conn_info 

331 

332 except Exception as e: 

333 status = HealthStatus.UNHEALTHY 

334 message = f"Database query failed: {e}" 

335 details["connectivity_test"] = "failed" 

336 details["error"] = str(e) 

337 

338 except Exception as e: 

339 status = HealthStatus.DEGRADED 

340 message = f"Database health check failed: {e}" 

341 details["error"] = str(e) 

342 

343 return HealthCheckResult( 

344 component_id=self.component_id, 

345 component_name=self.component_name, 

346 status=status, 

347 check_type=check_type, 

348 message=message, 

349 details=details, 

350 ) 

351 

352 

353async def register_fastblocks_health_checks() -> bool: 

354 """Register all FastBlocks components with ACB HealthService. 

355 

356 Returns: 

357 True if registration successful, False if ACB HealthService unavailable 

358 """ 

359 if not acb_health_available: 

360 return False 

361 

362 try: 

363 # Get ACB HealthService from the service registry 

364 health_service = await depends.get("health_service") 

365 

366 if health_service is None: 

367 return False 

368 

369 # Register all FastBlocks health checks 

370 await health_service.register_component(TemplatesHealthCheck()) 

371 await health_service.register_component(CacheHealthCheck()) 

372 await health_service.register_component(RoutesHealthCheck()) 

373 await health_service.register_component(DatabaseHealthCheck()) 

374 

375 return True 

376 

377 except Exception: 

378 # Graceful degradation if registration fails 

379 return False 

380 

381 

382async def get_fastblocks_health_summary() -> dict[str, t.Any]: 

383 """Get comprehensive health summary for all FastBlocks components. 

384 

385 Returns: 

386 Dictionary with health status for each component 

387 """ 

388 if not acb_health_available: 

389 return { 

390 "status": "unknown", 

391 "message": "ACB HealthService not available", 

392 "components": {}, 

393 } 

394 

395 try: 

396 health_service = await depends.get("health_service") 

397 

398 if health_service is None: 

399 return { 

400 "status": "unknown", 

401 "message": "ACB HealthService not initialized", 

402 "components": {}, 

403 } 

404 

405 # Get health status for all registered components 

406 component_ids = ["templates", "cache", "routes", "database"] 

407 results = {} 

408 

409 for component_id in component_ids: 

410 try: 

411 result = await health_service.get_component_health(component_id) 

412 if result: 

413 results[component_id] = result.to_dict() 

414 except Exception: 

415 results[component_id] = { 

416 "status": "unknown", 

417 "message": "Health check failed", 

418 } 

419 

420 # Determine overall status 

421 statuses = [r.get("status", "unknown") for r in results.values()] 

422 

423 if "unhealthy" in statuses or "critical" in statuses: 

424 overall_status = "unhealthy" 

425 elif "degraded" in statuses: 

426 overall_status = "degraded" 

427 elif all(s == "healthy" for s in statuses): 

428 overall_status = "healthy" 

429 else: 

430 overall_status = "unknown" 

431 

432 return { 

433 "status": overall_status, 

434 "message": f"FastBlocks health status: {overall_status}", 

435 "components": results, 

436 } 

437 

438 except Exception as e: 

439 return { 

440 "status": "error", 

441 "message": f"Health check failed: {e}", 

442 "components": {}, 

443 } 

444 

445 

446# Module metadata for ACB discovery 

447MODULE_ID = UUID("01937d88-0000-7000-8000-000000000001") 

448MODULE_STATUS = AdapterStatus.STABLE 

449 

450# Auto-register health checks on module import 

451# Note: Registration happens during application startup via depends.set() 

452# This ensures proper async context is available