Coverage for fastblocks / _events_integration.py: 3%
219 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
1"""ACB Events system integration for FastBlocks.
3This module bridges FastBlocks components with ACB's event-driven architecture,
4enabling reactive updates, cache invalidation, and admin action tracking.
6Author: lesleslie <les@wedgwoodwebworks.com>
7Created: 2025-10-01
8"""
10import operator
11import typing as t
12from contextlib import suppress
13from dataclasses import dataclass
14from uuid import UUID
16from acb.adapters import AdapterStatus
17from acb.depends import Inject, depends
19# Optional ACB events imports (graceful degradation if not available)
20try:
21 from acb.events import (
22 Event,
23 EventHandler,
24 EventHandlerResult,
25 EventPriority,
26 EventPublisher,
27 EventSubscription,
28 create_event,
29 )
31 acb_events_available = True
32except ImportError:
33 acb_events_available = False
34 from typing import Any as Event # type: ignore[misc]
35 from typing import Any as EventHandler # type: ignore[misc]
36 from typing import Any as EventHandlerResult # type: ignore[misc]
37 from typing import Any as EventPriority # type: ignore[misc]
38 from typing import Any as EventPublisher # type: ignore[misc]
39 from typing import Any as EventSubscription # type: ignore[misc]
41 create_event = t.cast(t.Any, None)
42 event_handler = t.cast(t.Any, None)
45# FastBlocks Event Types
46class FastBlocksEventType:
47 """Event types emitted by FastBlocks components."""
49 # Cache events
50 CACHE_INVALIDATED = "fastblocks.cache.invalidated"
51 CACHE_CLEARED = "fastblocks.cache.cleared"
53 # Template events
54 TEMPLATE_RENDERED = "fastblocks.template.rendered"
55 TEMPLATE_ERROR = "fastblocks.template.error"
57 # HTMX events (server-sent updates)
58 HTMX_REFRESH = "fastblocks.htmx.refresh"
59 HTMX_REDIRECT = "fastblocks.htmx.redirect"
60 HTMX_TRIGGER = "fastblocks.htmx.trigger"
62 # Admin events
63 ADMIN_ACTION = "fastblocks.admin.action"
64 ADMIN_LOGIN = "fastblocks.admin.login"
65 ADMIN_LOGOUT = "fastblocks.admin.logout"
67 # Route events
68 ROUTE_REGISTERED = "fastblocks.route.registered"
69 ROUTE_ACCESSED = "fastblocks.route.accessed"
72@dataclass
73class CacheInvalidationPayload:
74 """Payload for cache invalidation events."""
76 cache_key: str
77 reason: str
78 invalidated_by: str | None = None
79 affected_templates: list[str] | None = None
82@dataclass
83class TemplateRenderPayload:
84 """Payload for template render events."""
86 template_name: str
87 render_time_ms: float
88 cache_hit: bool
89 context_size: int
90 fragment_count: int
91 error: str | None = None
94@dataclass
95class HtmxUpdatePayload:
96 """Payload for HTMX update events."""
98 update_type: str # "refresh", "redirect", "trigger"
99 target: str | None = None # CSS selector or URL
100 swap_method: str | None = None # "innerHTML", "outerHTML", etc.
101 trigger_name: str | None = None
102 trigger_data: dict[str, t.Any] | None = None
105@dataclass
106class AdminActionPayload:
107 """Payload for admin action events."""
109 action_type: str # "create", "update", "delete", "login", "logout"
110 user_id: str
111 resource_type: str | None = None
112 resource_id: str | None = None
113 changes: dict[str, t.Any] | None = None
114 ip_address: str | None = None
117class CacheInvalidationHandler(EventHandler): # type: ignore[misc]
118 """Handler for cache invalidation events."""
120 @depends.inject # type: ignore[misc] # ACB untyped decorator
121 def __init__(self, cache: Inject[t.Any]) -> None:
122 super().__init__()
123 self.cache = cache
125 async def handle(self, event: Event) -> t.Any:
126 """Handle cache invalidation event."""
127 if not acb_events_available:
128 return None
130 try:
131 payload = CacheInvalidationPayload(**event.payload)
133 # Invalidate the cache key
134 if self.cache:
135 await self.cache.delete(payload.cache_key)
137 # Also invalidate related template caches if specified
138 if payload.affected_templates:
139 for template_name in payload.affected_templates:
140 template_key = f"template:{template_name}"
141 await self.cache.delete(template_key)
143 return EventHandlerResult(
144 success=True,
145 message=f"Invalidated cache key: {payload.cache_key}",
146 )
148 except Exception as e:
149 return EventHandlerResult(
150 success=False,
151 error=str(e),
152 message=f"Failed to invalidate cache: {e}",
153 )
156class TemplateRenderHandler(EventHandler): # type: ignore[misc]
157 """Handler for template render events - collects performance metrics."""
159 def __init__(self) -> None:
160 super().__init__()
161 self.metrics: dict[str, list[TemplateRenderPayload]] = {}
163 async def handle(self, event: Event) -> t.Any:
164 """Handle template render event."""
165 if not acb_events_available:
166 return None
168 try:
169 payload = TemplateRenderPayload(**event.payload)
171 # Store metrics for performance analysis
172 if payload.template_name not in self.metrics:
173 self.metrics[payload.template_name] = []
175 self.metrics[payload.template_name].append(payload)
177 # Keep only last 100 renders per template
178 if len(self.metrics[payload.template_name]) > 100:
179 self.metrics[payload.template_name] = self.metrics[
180 payload.template_name
181 ][-100:]
183 return EventHandlerResult(
184 success=True,
185 message=f"Recorded render metrics for {payload.template_name}",
186 )
188 except Exception as e:
189 return EventHandlerResult(
190 success=False,
191 error=str(e),
192 message=f"Failed to record metrics: {e}",
193 )
195 def get_template_stats(self, template_name: str) -> dict[str, t.Any]:
196 """Get performance statistics for a template."""
197 if template_name not in self.metrics:
198 return {}
200 renders = self.metrics[template_name]
201 render_times = [r.render_time_ms for r in renders]
202 cache_hits = sum(1 for r in renders if r.cache_hit)
204 return {
205 "total_renders": len(renders),
206 "avg_render_time_ms": sum(render_times) / len(render_times),
207 "min_render_time_ms": min(render_times),
208 "max_render_time_ms": max(render_times),
209 "cache_hit_ratio": cache_hits / len(renders),
210 "recent_errors": [r.error for r in renders[-10:] if r.error],
211 }
214class HtmxUpdateHandler(EventHandler): # type: ignore[misc]
215 """Handler for HTMX update events - broadcasts to connected clients."""
217 def __init__(self) -> None:
218 super().__init__()
219 self.active_connections: set[t.Any] = set() # WebSocket connections
221 async def handle(self, event: Event) -> t.Any:
222 """Handle HTMX update event."""
223 if not acb_events_available:
224 return None
226 try:
227 payload = HtmxUpdatePayload(**event.payload)
229 # Build HTMX headers for server-sent event
230 headers = {}
232 if payload.update_type == "refresh" and payload.target:
233 headers["HX-Trigger"] = "refresh"
234 headers["HX-Refresh"] = "true"
236 elif payload.update_type == "redirect" and payload.target:
237 headers["HX-Redirect"] = payload.target
239 elif payload.update_type == "trigger" and payload.trigger_name:
240 headers["HX-Trigger"] = payload.trigger_name
241 if payload.trigger_data:
242 import json
244 headers["HX-Trigger-Data"] = json.dumps(payload.trigger_data)
246 # Broadcast to all connected clients
247 # Note: Actual WebSocket broadcast would happen in route handlers
248 # This handler just prepares the event data
250 return EventHandlerResult(
251 success=True,
252 message=f"Prepared HTMX {payload.update_type} event",
253 data={"headers": headers},
254 )
256 except Exception as e:
257 return EventHandlerResult(
258 success=False,
259 error=str(e),
260 message=f"Failed to prepare HTMX event: {e}",
261 )
264class AdminActionHandler(EventHandler): # type: ignore[misc]
265 """Handler for admin action events - audit logging."""
267 def __init__(self) -> None:
268 super().__init__()
269 self.audit_log: list[tuple[float, AdminActionPayload]] = []
271 async def handle(self, event: Event) -> t.Any:
272 """Handle admin action event."""
273 if not acb_events_available:
274 return None
276 try:
277 import time
279 payload = AdminActionPayload(**event.payload)
281 # Store in audit log
282 self.audit_log.append((time.time(), payload))
284 # Keep only last 1000 actions
285 if len(self.audit_log) > 1000:
286 self.audit_log = self.audit_log[-1000:]
288 # In production, this would also:
289 # - Write to database audit table
290 # - Send to monitoring/alerting system
291 # - Trigger security checks for sensitive actions
293 return EventHandlerResult(
294 success=True,
295 message=f"Logged admin action: {payload.action_type}",
296 )
298 except Exception as e:
299 return EventHandlerResult(
300 success=False,
301 error=str(e),
302 message=f"Failed to log admin action: {e}",
303 )
305 def get_recent_actions(self, limit: int = 50) -> list[dict[str, t.Any]]:
306 """Get recent admin actions."""
307 import time
309 now = time.time()
310 recent = sorted(self.audit_log, key=operator.itemgetter(0), reverse=True)[
311 :limit
312 ]
314 return [
315 {
316 "timestamp": timestamp,
317 "age_seconds": now - timestamp,
318 "action_type": payload.action_type,
319 "user_id": payload.user_id,
320 "resource_type": payload.resource_type,
321 "resource_id": payload.resource_id,
322 }
323 for timestamp, payload in recent
324 ]
327class FastBlocksEventPublisher:
328 """Simplified event publisher for FastBlocks components."""
330 _instance: t.ClassVar["FastBlocksEventPublisher | None"] = None
331 _publisher: t.Any = None # EventPublisher | None when ACB available
333 def __new__(cls) -> "FastBlocksEventPublisher":
334 """Singleton pattern for event publisher."""
335 if cls._instance is None:
336 cls._instance = super().__new__(cls)
337 return cls._instance
339 @depends.inject # type: ignore[misc] # ACB untyped decorator
340 def __init__(self, config: Inject[t.Any]) -> None:
341 if not acb_events_available:
342 return
344 self.config = config
345 self.source = "fastblocks"
347 # Initialize publisher lazily
348 if self._publisher is None and acb_events_available:
349 with suppress(Exception):
350 self._publisher = EventPublisher()
352 async def publish_cache_invalidation(
353 self,
354 cache_key: str,
355 reason: str,
356 invalidated_by: str | None = None,
357 affected_templates: list[str] | None = None,
358 ) -> bool:
359 """Publish cache invalidation event."""
360 if not acb_events_available or self._publisher is None:
361 return False
363 try:
364 event = create_event(
365 event_type=FastBlocksEventType.CACHE_INVALIDATED,
366 source=self.source,
367 payload={
368 "cache_key": cache_key,
369 "reason": reason,
370 "invalidated_by": invalidated_by,
371 "affected_templates": affected_templates,
372 },
373 priority=EventPriority.HIGH,
374 )
376 await self._publisher.publish(event)
377 return True
379 except Exception:
380 return False
382 async def publish_template_render(
383 self,
384 template_name: str,
385 render_time_ms: float,
386 cache_hit: bool,
387 context_size: int,
388 fragment_count: int,
389 error: str | None = None,
390 ) -> bool:
391 """Publish template render event."""
392 if not acb_events_available or self._publisher is None:
393 return False
395 try:
396 event_type = (
397 FastBlocksEventType.TEMPLATE_ERROR
398 if error
399 else FastBlocksEventType.TEMPLATE_RENDERED
400 )
402 event = create_event(
403 event_type=event_type,
404 source=self.source,
405 payload={
406 "template_name": template_name,
407 "render_time_ms": render_time_ms,
408 "cache_hit": cache_hit,
409 "context_size": context_size,
410 "fragment_count": fragment_count,
411 "error": error,
412 },
413 priority=EventPriority.NORMAL,
414 )
416 await self._publisher.publish(event)
417 return True
419 except Exception:
420 return False
422 async def publish_htmx_update(
423 self,
424 update_type: str,
425 target: str | None = None,
426 swap_method: str | None = None,
427 trigger_name: str | None = None,
428 trigger_data: dict[str, t.Any] | None = None,
429 ) -> bool:
430 """Publish HTMX update event."""
431 if not acb_events_available or self._publisher is None:
432 return False
434 try:
435 event = create_event(
436 event_type=FastBlocksEventType.HTMX_REFRESH,
437 source=self.source,
438 payload={
439 "update_type": update_type,
440 "target": target,
441 "swap_method": swap_method,
442 "trigger_name": trigger_name,
443 "trigger_data": trigger_data,
444 },
445 priority=EventPriority.HIGH,
446 )
448 await self._publisher.publish(event)
449 return True
451 except Exception:
452 return False
454 async def publish_admin_action(
455 self,
456 action_type: str,
457 user_id: str,
458 resource_type: str | None = None,
459 resource_id: str | None = None,
460 changes: dict[str, t.Any] | None = None,
461 ip_address: str | None = None,
462 ) -> bool:
463 """Publish admin action event."""
464 if not acb_events_available or self._publisher is None:
465 return False
467 try:
468 event = create_event(
469 event_type=FastBlocksEventType.ADMIN_ACTION,
470 source=self.source,
471 payload={
472 "action_type": action_type,
473 "user_id": user_id,
474 "resource_type": resource_type,
475 "resource_id": resource_id,
476 "changes": changes,
477 "ip_address": ip_address,
478 },
479 priority=EventPriority.CRITICAL,
480 )
482 await self._publisher.publish(event)
483 return True
485 except Exception:
486 return False
489async def register_fastblocks_event_handlers() -> bool:
490 """Register all FastBlocks event handlers with ACB Events system.
492 Returns:
493 True if registration successful, False if ACB Events unavailable
494 """
495 if not acb_events_available:
496 return False
498 try:
499 publisher = FastBlocksEventPublisher()
501 if publisher._publisher is None:
502 return False
504 # Register event handlers
505 cache_handler = CacheInvalidationHandler()
506 template_handler = TemplateRenderHandler()
507 htmx_handler = HtmxUpdateHandler()
508 admin_handler = AdminActionHandler()
510 # Subscribe handlers to their event types
511 await publisher._publisher.subscribe(
512 EventSubscription(
513 event_type=FastBlocksEventType.CACHE_INVALIDATED,
514 handler=cache_handler,
515 )
516 )
518 await publisher._publisher.subscribe(
519 EventSubscription(
520 event_type=FastBlocksEventType.TEMPLATE_RENDERED,
521 handler=template_handler,
522 )
523 )
525 await publisher._publisher.subscribe(
526 EventSubscription(
527 event_type=FastBlocksEventType.TEMPLATE_ERROR,
528 handler=template_handler,
529 )
530 )
532 await publisher._publisher.subscribe(
533 EventSubscription(
534 event_type=FastBlocksEventType.HTMX_REFRESH,
535 handler=htmx_handler,
536 )
537 )
539 await publisher._publisher.subscribe(
540 EventSubscription(
541 event_type=FastBlocksEventType.ADMIN_ACTION,
542 handler=admin_handler,
543 )
544 )
546 # Store handlers in depends for retrieval
547 depends.set(template_handler, name="template_metrics")
548 depends.set(admin_handler, name="admin_audit")
550 return True
552 except Exception:
553 # Graceful degradation if registration fails
554 return False
557def get_event_publisher() -> FastBlocksEventPublisher | None:
558 """Get the FastBlocks event publisher instance.
560 Returns:
561 Event publisher instance or None if ACB Events unavailable
562 """
563 if not acb_events_available:
564 return None
566 return FastBlocksEventPublisher()
569# Module metadata for ACB discovery
570MODULE_ID = UUID("01937d88-0000-7000-8000-000000000002")
571MODULE_STATUS = AdapterStatus.STABLE