Coverage for fastblocks / _events_integration.py: 3%

219 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:58 -0800

1"""ACB Events system integration for FastBlocks. 

2 

3This module bridges FastBlocks components with ACB's event-driven architecture, 

4enabling reactive updates, cache invalidation, and admin action tracking. 

5 

6Author: lesleslie <les@wedgwoodwebworks.com> 

7Created: 2025-10-01 

8""" 

9 

10import operator 

11import typing as t 

12from contextlib import suppress 

13from dataclasses import dataclass 

14from uuid import UUID 

15 

16from acb.adapters import AdapterStatus 

17from acb.depends import Inject, depends 

18 

19# Optional ACB events imports (graceful degradation if not available) 

20try: 

21 from acb.events import ( 

22 Event, 

23 EventHandler, 

24 EventHandlerResult, 

25 EventPriority, 

26 EventPublisher, 

27 EventSubscription, 

28 create_event, 

29 ) 

30 

31 acb_events_available = True 

32except ImportError: 

33 acb_events_available = False 

34 from typing import Any as Event # type: ignore[misc] 

35 from typing import Any as EventHandler # type: ignore[misc] 

36 from typing import Any as EventHandlerResult # type: ignore[misc] 

37 from typing import Any as EventPriority # type: ignore[misc] 

38 from typing import Any as EventPublisher # type: ignore[misc] 

39 from typing import Any as EventSubscription # type: ignore[misc] 

40 

41 create_event = t.cast(t.Any, None) 

42 event_handler = t.cast(t.Any, None) 

43 

44 

45# FastBlocks Event Types 

46class FastBlocksEventType: 

47 """Event types emitted by FastBlocks components.""" 

48 

49 # Cache events 

50 CACHE_INVALIDATED = "fastblocks.cache.invalidated" 

51 CACHE_CLEARED = "fastblocks.cache.cleared" 

52 

53 # Template events 

54 TEMPLATE_RENDERED = "fastblocks.template.rendered" 

55 TEMPLATE_ERROR = "fastblocks.template.error" 

56 

57 # HTMX events (server-sent updates) 

58 HTMX_REFRESH = "fastblocks.htmx.refresh" 

59 HTMX_REDIRECT = "fastblocks.htmx.redirect" 

60 HTMX_TRIGGER = "fastblocks.htmx.trigger" 

61 

62 # Admin events 

63 ADMIN_ACTION = "fastblocks.admin.action" 

64 ADMIN_LOGIN = "fastblocks.admin.login" 

65 ADMIN_LOGOUT = "fastblocks.admin.logout" 

66 

67 # Route events 

68 ROUTE_REGISTERED = "fastblocks.route.registered" 

69 ROUTE_ACCESSED = "fastblocks.route.accessed" 

70 

71 

72@dataclass 

73class CacheInvalidationPayload: 

74 """Payload for cache invalidation events.""" 

75 

76 cache_key: str 

77 reason: str 

78 invalidated_by: str | None = None 

79 affected_templates: list[str] | None = None 

80 

81 

82@dataclass 

83class TemplateRenderPayload: 

84 """Payload for template render events.""" 

85 

86 template_name: str 

87 render_time_ms: float 

88 cache_hit: bool 

89 context_size: int 

90 fragment_count: int 

91 error: str | None = None 

92 

93 

94@dataclass 

95class HtmxUpdatePayload: 

96 """Payload for HTMX update events.""" 

97 

98 update_type: str # "refresh", "redirect", "trigger" 

99 target: str | None = None # CSS selector or URL 

100 swap_method: str | None = None # "innerHTML", "outerHTML", etc. 

101 trigger_name: str | None = None 

102 trigger_data: dict[str, t.Any] | None = None 

103 

104 

105@dataclass 

106class AdminActionPayload: 

107 """Payload for admin action events.""" 

108 

109 action_type: str # "create", "update", "delete", "login", "logout" 

110 user_id: str 

111 resource_type: str | None = None 

112 resource_id: str | None = None 

113 changes: dict[str, t.Any] | None = None 

114 ip_address: str | None = None 

115 

116 

117class CacheInvalidationHandler(EventHandler): # type: ignore[misc] 

118 """Handler for cache invalidation events.""" 

119 

120 @depends.inject # type: ignore[misc] # ACB untyped decorator 

121 def __init__(self, cache: Inject[t.Any]) -> None: 

122 super().__init__() 

123 self.cache = cache 

124 

125 async def handle(self, event: Event) -> t.Any: 

126 """Handle cache invalidation event.""" 

127 if not acb_events_available: 

128 return None 

129 

130 try: 

131 payload = CacheInvalidationPayload(**event.payload) 

132 

133 # Invalidate the cache key 

134 if self.cache: 

135 await self.cache.delete(payload.cache_key) 

136 

137 # Also invalidate related template caches if specified 

138 if payload.affected_templates: 

139 for template_name in payload.affected_templates: 

140 template_key = f"template:{template_name}" 

141 await self.cache.delete(template_key) 

142 

143 return EventHandlerResult( 

144 success=True, 

145 message=f"Invalidated cache key: {payload.cache_key}", 

146 ) 

147 

148 except Exception as e: 

149 return EventHandlerResult( 

150 success=False, 

151 error=str(e), 

152 message=f"Failed to invalidate cache: {e}", 

153 ) 

154 

155 

156class TemplateRenderHandler(EventHandler): # type: ignore[misc] 

157 """Handler for template render events - collects performance metrics.""" 

158 

159 def __init__(self) -> None: 

160 super().__init__() 

161 self.metrics: dict[str, list[TemplateRenderPayload]] = {} 

162 

163 async def handle(self, event: Event) -> t.Any: 

164 """Handle template render event.""" 

165 if not acb_events_available: 

166 return None 

167 

168 try: 

169 payload = TemplateRenderPayload(**event.payload) 

170 

171 # Store metrics for performance analysis 

172 if payload.template_name not in self.metrics: 

173 self.metrics[payload.template_name] = [] 

174 

175 self.metrics[payload.template_name].append(payload) 

176 

177 # Keep only last 100 renders per template 

178 if len(self.metrics[payload.template_name]) > 100: 

179 self.metrics[payload.template_name] = self.metrics[ 

180 payload.template_name 

181 ][-100:] 

182 

183 return EventHandlerResult( 

184 success=True, 

185 message=f"Recorded render metrics for {payload.template_name}", 

186 ) 

187 

188 except Exception as e: 

189 return EventHandlerResult( 

190 success=False, 

191 error=str(e), 

192 message=f"Failed to record metrics: {e}", 

193 ) 

194 

195 def get_template_stats(self, template_name: str) -> dict[str, t.Any]: 

196 """Get performance statistics for a template.""" 

197 if template_name not in self.metrics: 

198 return {} 

199 

200 renders = self.metrics[template_name] 

201 render_times = [r.render_time_ms for r in renders] 

202 cache_hits = sum(1 for r in renders if r.cache_hit) 

203 

204 return { 

205 "total_renders": len(renders), 

206 "avg_render_time_ms": sum(render_times) / len(render_times), 

207 "min_render_time_ms": min(render_times), 

208 "max_render_time_ms": max(render_times), 

209 "cache_hit_ratio": cache_hits / len(renders), 

210 "recent_errors": [r.error for r in renders[-10:] if r.error], 

211 } 

212 

213 

214class HtmxUpdateHandler(EventHandler): # type: ignore[misc] 

215 """Handler for HTMX update events - broadcasts to connected clients.""" 

216 

217 def __init__(self) -> None: 

218 super().__init__() 

219 self.active_connections: set[t.Any] = set() # WebSocket connections 

220 

221 async def handle(self, event: Event) -> t.Any: 

222 """Handle HTMX update event.""" 

223 if not acb_events_available: 

224 return None 

225 

226 try: 

227 payload = HtmxUpdatePayload(**event.payload) 

228 

229 # Build HTMX headers for server-sent event 

230 headers = {} 

231 

232 if payload.update_type == "refresh" and payload.target: 

233 headers["HX-Trigger"] = "refresh" 

234 headers["HX-Refresh"] = "true" 

235 

236 elif payload.update_type == "redirect" and payload.target: 

237 headers["HX-Redirect"] = payload.target 

238 

239 elif payload.update_type == "trigger" and payload.trigger_name: 

240 headers["HX-Trigger"] = payload.trigger_name 

241 if payload.trigger_data: 

242 import json 

243 

244 headers["HX-Trigger-Data"] = json.dumps(payload.trigger_data) 

245 

246 # Broadcast to all connected clients 

247 # Note: Actual WebSocket broadcast would happen in route handlers 

248 # This handler just prepares the event data 

249 

250 return EventHandlerResult( 

251 success=True, 

252 message=f"Prepared HTMX {payload.update_type} event", 

253 data={"headers": headers}, 

254 ) 

255 

256 except Exception as e: 

257 return EventHandlerResult( 

258 success=False, 

259 error=str(e), 

260 message=f"Failed to prepare HTMX event: {e}", 

261 ) 

262 

263 

264class AdminActionHandler(EventHandler): # type: ignore[misc] 

265 """Handler for admin action events - audit logging.""" 

266 

267 def __init__(self) -> None: 

268 super().__init__() 

269 self.audit_log: list[tuple[float, AdminActionPayload]] = [] 

270 

271 async def handle(self, event: Event) -> t.Any: 

272 """Handle admin action event.""" 

273 if not acb_events_available: 

274 return None 

275 

276 try: 

277 import time 

278 

279 payload = AdminActionPayload(**event.payload) 

280 

281 # Store in audit log 

282 self.audit_log.append((time.time(), payload)) 

283 

284 # Keep only last 1000 actions 

285 if len(self.audit_log) > 1000: 

286 self.audit_log = self.audit_log[-1000:] 

287 

288 # In production, this would also: 

289 # - Write to database audit table 

290 # - Send to monitoring/alerting system 

291 # - Trigger security checks for sensitive actions 

292 

293 return EventHandlerResult( 

294 success=True, 

295 message=f"Logged admin action: {payload.action_type}", 

296 ) 

297 

298 except Exception as e: 

299 return EventHandlerResult( 

300 success=False, 

301 error=str(e), 

302 message=f"Failed to log admin action: {e}", 

303 ) 

304 

305 def get_recent_actions(self, limit: int = 50) -> list[dict[str, t.Any]]: 

306 """Get recent admin actions.""" 

307 import time 

308 

309 now = time.time() 

310 recent = sorted(self.audit_log, key=operator.itemgetter(0), reverse=True)[ 

311 :limit 

312 ] 

313 

314 return [ 

315 { 

316 "timestamp": timestamp, 

317 "age_seconds": now - timestamp, 

318 "action_type": payload.action_type, 

319 "user_id": payload.user_id, 

320 "resource_type": payload.resource_type, 

321 "resource_id": payload.resource_id, 

322 } 

323 for timestamp, payload in recent 

324 ] 

325 

326 

327class FastBlocksEventPublisher: 

328 """Simplified event publisher for FastBlocks components.""" 

329 

330 _instance: t.ClassVar["FastBlocksEventPublisher | None"] = None 

331 _publisher: t.Any = None # EventPublisher | None when ACB available 

332 

333 def __new__(cls) -> "FastBlocksEventPublisher": 

334 """Singleton pattern for event publisher.""" 

335 if cls._instance is None: 

336 cls._instance = super().__new__(cls) 

337 return cls._instance 

338 

339 @depends.inject # type: ignore[misc] # ACB untyped decorator 

340 def __init__(self, config: Inject[t.Any]) -> None: 

341 if not acb_events_available: 

342 return 

343 

344 self.config = config 

345 self.source = "fastblocks" 

346 

347 # Initialize publisher lazily 

348 if self._publisher is None and acb_events_available: 

349 with suppress(Exception): 

350 self._publisher = EventPublisher() 

351 

352 async def publish_cache_invalidation( 

353 self, 

354 cache_key: str, 

355 reason: str, 

356 invalidated_by: str | None = None, 

357 affected_templates: list[str] | None = None, 

358 ) -> bool: 

359 """Publish cache invalidation event.""" 

360 if not acb_events_available or self._publisher is None: 

361 return False 

362 

363 try: 

364 event = create_event( 

365 event_type=FastBlocksEventType.CACHE_INVALIDATED, 

366 source=self.source, 

367 payload={ 

368 "cache_key": cache_key, 

369 "reason": reason, 

370 "invalidated_by": invalidated_by, 

371 "affected_templates": affected_templates, 

372 }, 

373 priority=EventPriority.HIGH, 

374 ) 

375 

376 await self._publisher.publish(event) 

377 return True 

378 

379 except Exception: 

380 return False 

381 

382 async def publish_template_render( 

383 self, 

384 template_name: str, 

385 render_time_ms: float, 

386 cache_hit: bool, 

387 context_size: int, 

388 fragment_count: int, 

389 error: str | None = None, 

390 ) -> bool: 

391 """Publish template render event.""" 

392 if not acb_events_available or self._publisher is None: 

393 return False 

394 

395 try: 

396 event_type = ( 

397 FastBlocksEventType.TEMPLATE_ERROR 

398 if error 

399 else FastBlocksEventType.TEMPLATE_RENDERED 

400 ) 

401 

402 event = create_event( 

403 event_type=event_type, 

404 source=self.source, 

405 payload={ 

406 "template_name": template_name, 

407 "render_time_ms": render_time_ms, 

408 "cache_hit": cache_hit, 

409 "context_size": context_size, 

410 "fragment_count": fragment_count, 

411 "error": error, 

412 }, 

413 priority=EventPriority.NORMAL, 

414 ) 

415 

416 await self._publisher.publish(event) 

417 return True 

418 

419 except Exception: 

420 return False 

421 

422 async def publish_htmx_update( 

423 self, 

424 update_type: str, 

425 target: str | None = None, 

426 swap_method: str | None = None, 

427 trigger_name: str | None = None, 

428 trigger_data: dict[str, t.Any] | None = None, 

429 ) -> bool: 

430 """Publish HTMX update event.""" 

431 if not acb_events_available or self._publisher is None: 

432 return False 

433 

434 try: 

435 event = create_event( 

436 event_type=FastBlocksEventType.HTMX_REFRESH, 

437 source=self.source, 

438 payload={ 

439 "update_type": update_type, 

440 "target": target, 

441 "swap_method": swap_method, 

442 "trigger_name": trigger_name, 

443 "trigger_data": trigger_data, 

444 }, 

445 priority=EventPriority.HIGH, 

446 ) 

447 

448 await self._publisher.publish(event) 

449 return True 

450 

451 except Exception: 

452 return False 

453 

454 async def publish_admin_action( 

455 self, 

456 action_type: str, 

457 user_id: str, 

458 resource_type: str | None = None, 

459 resource_id: str | None = None, 

460 changes: dict[str, t.Any] | None = None, 

461 ip_address: str | None = None, 

462 ) -> bool: 

463 """Publish admin action event.""" 

464 if not acb_events_available or self._publisher is None: 

465 return False 

466 

467 try: 

468 event = create_event( 

469 event_type=FastBlocksEventType.ADMIN_ACTION, 

470 source=self.source, 

471 payload={ 

472 "action_type": action_type, 

473 "user_id": user_id, 

474 "resource_type": resource_type, 

475 "resource_id": resource_id, 

476 "changes": changes, 

477 "ip_address": ip_address, 

478 }, 

479 priority=EventPriority.CRITICAL, 

480 ) 

481 

482 await self._publisher.publish(event) 

483 return True 

484 

485 except Exception: 

486 return False 

487 

488 

489async def register_fastblocks_event_handlers() -> bool: 

490 """Register all FastBlocks event handlers with ACB Events system. 

491 

492 Returns: 

493 True if registration successful, False if ACB Events unavailable 

494 """ 

495 if not acb_events_available: 

496 return False 

497 

498 try: 

499 publisher = FastBlocksEventPublisher() 

500 

501 if publisher._publisher is None: 

502 return False 

503 

504 # Register event handlers 

505 cache_handler = CacheInvalidationHandler() 

506 template_handler = TemplateRenderHandler() 

507 htmx_handler = HtmxUpdateHandler() 

508 admin_handler = AdminActionHandler() 

509 

510 # Subscribe handlers to their event types 

511 await publisher._publisher.subscribe( 

512 EventSubscription( 

513 event_type=FastBlocksEventType.CACHE_INVALIDATED, 

514 handler=cache_handler, 

515 ) 

516 ) 

517 

518 await publisher._publisher.subscribe( 

519 EventSubscription( 

520 event_type=FastBlocksEventType.TEMPLATE_RENDERED, 

521 handler=template_handler, 

522 ) 

523 ) 

524 

525 await publisher._publisher.subscribe( 

526 EventSubscription( 

527 event_type=FastBlocksEventType.TEMPLATE_ERROR, 

528 handler=template_handler, 

529 ) 

530 ) 

531 

532 await publisher._publisher.subscribe( 

533 EventSubscription( 

534 event_type=FastBlocksEventType.HTMX_REFRESH, 

535 handler=htmx_handler, 

536 ) 

537 ) 

538 

539 await publisher._publisher.subscribe( 

540 EventSubscription( 

541 event_type=FastBlocksEventType.ADMIN_ACTION, 

542 handler=admin_handler, 

543 ) 

544 ) 

545 

546 # Store handlers in depends for retrieval 

547 depends.set(template_handler, name="template_metrics") 

548 depends.set(admin_handler, name="admin_audit") 

549 

550 return True 

551 

552 except Exception: 

553 # Graceful degradation if registration fails 

554 return False 

555 

556 

557def get_event_publisher() -> FastBlocksEventPublisher | None: 

558 """Get the FastBlocks event publisher instance. 

559 

560 Returns: 

561 Event publisher instance or None if ACB Events unavailable 

562 """ 

563 if not acb_events_available: 

564 return None 

565 

566 return FastBlocksEventPublisher() 

567 

568 

569# Module metadata for ACB discovery 

570MODULE_ID = UUID("01937d88-0000-7000-8000-000000000002") 

571MODULE_STATUS = AdapterStatus.STABLE