Coverage for fastblocks / adapters / templates / _htmy_components.py: 76%
321 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:30 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:30 -0800
1"""HTMY Component Management System for FastBlocks.
3Provides advanced component discovery, validation, scaffolding, and lifecycle management
4for HTMY components with deep HTMX integration and async rendering capabilities.
6Key Features:
7- Automatic component discovery with intelligent caching
8- Dataclass-based component scaffolding with validation
9- Component composition and nesting patterns
10- HTMX-aware state management
11- Async component rendering with lifecycle hooks
12- Advanced error handling and debugging
13- Hot-reloading support for development
15Author: lesleslie <les@wedgwoodwebworks.com>
16Created: 2025-01-13
17"""
19import asyncio
20import inspect
21import os
22import tempfile
23import typing as t
24from abc import ABC, abstractmethod
25from dataclasses import dataclass, field, fields, is_dataclass
26from datetime import datetime
27from enum import Enum
28from typing import Any, Optional
29from uuid import uuid4
31from acb.debug import debug
32from anyio import Path as AsyncPath
33from starlette.requests import Request
35try:
36 from pydantic import BaseModel
37except ImportError:
38 BaseModel = None # type: ignore[no-redef]
41class ComponentStatus(str, Enum):
42 """Component lifecycle status."""
44 DISCOVERED = "discovered"
45 VALIDATED = "validated"
46 COMPILED = "compiled"
47 READY = "ready"
48 ERROR = "error"
49 DEPRECATED = "deprecated"
52class ComponentType(str, Enum):
53 """Component classification types."""
55 BASIC = "basic"
56 DATACLASS = "dataclass"
57 PYDANTIC = "pydantic"
58 ASYNC = "async"
59 HTMX = "htmx"
60 COMPOSITE = "composite"
63@dataclass
64class ComponentMetadata:
65 """Component metadata for discovery and management."""
67 name: str
68 path: AsyncPath
69 type: ComponentType
70 status: ComponentStatus = ComponentStatus.DISCOVERED
71 dependencies: list[str] = field(default_factory=list)
72 htmx_attributes: dict[str, Any] = field(default_factory=dict)
73 cache_key: str | None = None
74 last_modified: datetime | None = None
75 error_message: str | None = None
76 docstring: str | None = None
78 def __post_init__(self) -> None:
79 if self.cache_key is None:
80 self.cache_key = f"component_{self.name}_{self.path.stem}"
83class ComponentValidationError(Exception):
84 """Raised when component validation fails."""
86 pass
89class ComponentCompilationError(Exception):
90 """Raised when component compilation fails."""
92 pass
95class ComponentRenderError(Exception):
96 """Raised when component rendering fails."""
98 pass
101class HTMXComponentMixin:
102 """Mixin for HTMX-aware components."""
104 @property
105 def htmx_attrs(self) -> dict[str, str]:
106 """Default HTMX attributes for the component."""
107 return {}
109 def get_htmx_trigger(self, request: Request) -> str | None:
110 """Extract HTMX trigger from request."""
111 return request.headers.get("HX-Trigger")
113 def get_htmx_target(self, request: Request) -> str | None:
114 """Extract HTMX target from request."""
115 return request.headers.get("HX-Target")
117 def is_htmx_request(self, request: Request) -> bool:
118 """Check if request is from HTMX."""
119 return request.headers.get("HX-Request") == "true"
122class ComponentBase(ABC):
123 """Base class for all HTMY components."""
125 def __init__(self, **kwargs: Any) -> None:
126 self._context: dict[str, Any] = kwargs
127 self._request: Request | None = kwargs.get("request")
128 self._children: list[ComponentBase] = []
129 self._parent: ComponentBase | None = None
131 @abstractmethod
132 def htmy(self, context: dict[str, Any]) -> str:
133 """Render the component to HTML."""
134 pass
136 async def async_htmy(self, context: dict[str, Any]) -> str:
137 """Async version of htmy method."""
138 if asyncio.iscoroutinefunction(self.htmy):
139 return t.cast(str, await self.htmy(context))
140 return t.cast(str, self.htmy(context))
142 def add_child(self, child: "ComponentBase") -> None:
143 """Add a child component."""
144 child._parent = self
145 self._children.append(child)
147 def remove_child(self, child: "ComponentBase") -> None:
148 """Remove a child component."""
149 if child in self._children:
150 child._parent = None
151 self._children.remove(child)
153 @property
154 def children(self) -> list["ComponentBase"]:
155 """Get child components."""
156 return self._children.copy()
158 @property
159 def parent(self) -> Optional["ComponentBase"]:
160 """Get parent component."""
161 return self._parent
164class DataclassComponentBase(ComponentBase):
165 """Base class for dataclass-based components."""
167 def __init_subclass__(cls, **kwargs: Any) -> None:
168 super().__init_subclass__(**kwargs)
169 if not is_dataclass(cls):
170 raise ComponentValidationError(
171 f"Component {cls.__name__} must be a dataclass"
172 )
174 def validate_fields(self) -> None:
175 """Validate component fields."""
176 if not is_dataclass(self):
177 return
179 for field_info in fields(self):
180 value = getattr(self, field_info.name)
181 if field_info.type and value is not None:
182 # Basic type validation
183 origin = getattr(field_info.type, "__origin__", None)
184 # Only validate if field_info.type is a proper class
185 if origin is None and isinstance(field_info.type, type):
186 if not isinstance(value, field_info.type):
187 raise ComponentValidationError(
188 f"Field {field_info.name} must be of type {field_info.type}"
189 )
192class ComponentScaffolder:
193 """Scaffolding system for creating new components."""
195 @staticmethod
196 def create_basic_component(
197 name: str, props: dict[str, type] | None = None, htmx_enabled: bool = False
198 ) -> str:
199 """Create a basic component template."""
200 props = props or {}
202 # Generate prop fields
203 prop_lines = []
204 init_params = []
205 for prop_name, prop_type in props.items():
206 prop_lines.append(f" {prop_name}: {prop_type.__name__}")
207 init_params.append(f"{prop_name}: {prop_type.__name__}")
209 # Generate component class
210 mixins = ["HTMXComponentMixin"] if htmx_enabled else []
211 base_classes = ["DataclassComponentBase"] + mixins
213 template = f'''"""Component: {name}
215Auto-generated component using FastBlocks HTMY scaffolding.
216"""
218from dataclasses import dataclass
219from typing import Any
220from fastblocks.adapters.templates._htmy_components import (
221 DataclassComponentBase,
222 HTMXComponentMixin,
223)
226@dataclass
227class {name}({", ".join(base_classes)}):
228 """Auto-generated {name} component."""
229{chr(10).join(prop_lines) if prop_lines else " pass"}
231 def htmy(self, context: dict[str, Any]) -> str:
232 """Render the {name} component."""
233 return f"""
234 <div class="{name.lower()}-component">
235 <h3>{name} Component</h3>
236 {f'<p>{{self.{list(props.keys())[0]} if props else "content"}}</p>' if props else ""}
237 <!-- Add your HTML here -->
238 </div>
239 """
240'''
242 return template
244 @staticmethod
245 def create_htmx_component(
246 name: str, endpoint: str, trigger: str = "click", target: str = "#content"
247 ) -> str:
248 """Create an HTMX-enabled component template."""
249 template = f'''"""Component: {name}
251HTMX-enabled component for interactive behavior.
252"""
254from dataclasses import dataclass
255from typing import Any
256from fastblocks.adapters.templates._htmy_components import (
257 DataclassComponentBase,
258 HTMXComponentMixin,
259)
262@dataclass
263class {name}(DataclassComponentBase, HTMXComponentMixin):
264 """HTMX-enabled {name} component."""
265 label: str = "{name}"
266 css_class: str = "{name.lower()}-component"
268 @property
269 def htmx_attrs(self) -> dict[str, str]:
270 """HTMX attributes for the component."""
271 return {{
272 "hx-get": "{endpoint}",
273 "hx-trigger": "{trigger}",
274 "hx-target": "{target}",
275 "hx-swap": "innerHTML"
276 }}
278 def htmy(self, context: dict[str, Any]) -> str:
279 """Render the {name} component."""
280 attrs = " ".join([f'{{k}}="{{v}}"' for k, v in self.htmx_attrs.items()])
282 return f"""
283 <div class="{{self.css_class}}" {{attrs}}>
284 <button type="button">{{self.label}}</button>
285 </div>
286 """
287'''
289 return template
291 @staticmethod
292 def create_composite_component(name: str, children: list[str]) -> str:
293 """Create a composite component template."""
294 template = f'''"""Component: {name}
296Composite component containing multiple child components.
297"""
299from dataclasses import dataclass
300from typing import Any
301from fastblocks.adapters.templates._htmy_components import DataclassComponentBase
304@dataclass
305class {name}(DataclassComponentBase):
306 """Composite {name} component."""
307 title: str = "{name}"
309 def htmy(self, context: dict[str, Any]) -> str:
310 """Render the {name} composite component."""
311 # Access render_component from context if available
312 render_component = context.get("render_component")
314 children_html = ""
315 if render_component:
316{chr(10).join([f' children_html += render_component("{child}", context)' for child in children])}
318 return f"""
319 <div class="{name.lower()}-composite">
320 <h2>{{self.title}}</h2>
321 <div class="children">
322 {{children_html}}
323 </div>
324 </div>
325 """
326'''
328 return template
331class ComponentValidator:
332 """Component validation system."""
334 @staticmethod
335 async def validate_component_file(component_path: AsyncPath) -> ComponentMetadata:
336 """Validate a component file and extract metadata."""
337 try:
338 source = await component_path.read_text()
340 # Basic syntax validation
341 try:
342 compile(source, str(component_path), "exec")
343 except SyntaxError as e:
344 raise ComponentValidationError(f"Syntax error in {component_path}: {e}")
346 component_class = await ComponentValidator._load_component_class_from_file(
347 source, component_path
348 )
350 if component_class is None:
351 raise ComponentValidationError(
352 f"No valid component class found in {component_path}"
353 )
355 return await ComponentValidator._create_component_metadata(
356 component_class, component_path
357 )
359 except Exception as e:
360 return ComponentMetadata(
361 name=component_path.stem,
362 path=component_path,
363 type=ComponentType.BASIC,
364 status=ComponentStatus.ERROR,
365 error_message=str(e),
366 )
368 @staticmethod
369 async def _load_component_class_from_file(source: str, component_path: AsyncPath):
370 """Load component class from source file."""
371 # Import and analyze component safely
372 import importlib.util
374 # Create a temporary module to safely load the component
375 with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
376 f.write(source)
377 temp_module_path = f.name
379 try:
380 spec = importlib.util.spec_from_file_location(
381 component_path.stem, temp_module_path
382 )
383 if spec is None or spec.loader is None:
384 raise ComponentValidationError(
385 f"Could not load module from {component_path}"
386 )
388 module = importlib.util.module_from_spec(spec)
389 spec.loader.exec_module(module)
391 component_class = None
392 for obj in vars(module).values():
393 if (
394 inspect.isclass(obj)
395 and hasattr(obj, "htmy")
396 and callable(getattr(obj, "htmy"))
397 ):
398 component_class = obj
399 break
400 finally:
401 # Clean up the temporary file
402 os.unlink(temp_module_path)
404 return component_class
406 @staticmethod
407 async def _create_component_metadata(
408 component_class: type, component_path: AsyncPath
409 ) -> ComponentMetadata:
410 """Create ComponentMetadata from component class and path."""
411 # Determine component type
412 component_type = ComponentValidator._determine_component_type(component_class)
414 # Extract metadata
415 metadata = ComponentMetadata(
416 name=component_path.stem,
417 path=component_path,
418 type=component_type,
419 status=ComponentStatus.VALIDATED,
420 docstring=inspect.getdoc(component_class),
421 last_modified=datetime.fromtimestamp(
422 (await component_path.stat()).st_mtime
423 ),
424 )
426 # Extract dependencies and HTMX attributes
427 if hasattr(component_class, "htmx_attrs"):
428 metadata.htmx_attributes = getattr(component_class, "htmx_attrs", {})
430 return metadata
432 @staticmethod
433 def _determine_component_type(component_class: type) -> ComponentType:
434 """Determine the type of component."""
435 if is_dataclass(component_class):
436 if issubclass(component_class, HTMXComponentMixin):
437 return ComponentType.HTMX
438 return ComponentType.DATACLASS
440 if BaseModel and issubclass(component_class, BaseModel):
441 return ComponentType.PYDANTIC
443 if hasattr(component_class, "async_htmy"):
444 return ComponentType.ASYNC
446 return ComponentType.BASIC
449class ComponentLifecycleManager:
450 """Manages component lifecycle and state."""
452 def __init__(self) -> None:
453 self._component_states: dict[str, dict[str, Any]] = {}
454 self._lifecycle_hooks: dict[str, list[t.Callable[..., Any]]] = {
455 "before_render": [],
456 "after_render": [],
457 "on_error": [],
458 "on_state_change": [],
459 }
461 def register_hook(self, event: str, callback: t.Callable[..., Any]) -> None:
462 """Register a lifecycle hook."""
463 if event in self._lifecycle_hooks:
464 self._lifecycle_hooks[event].append(callback)
466 async def execute_hooks(self, event: str, **kwargs: Any) -> None:
467 """Execute lifecycle hooks for an event."""
468 for hook in self._lifecycle_hooks.get(event, []):
469 try:
470 if asyncio.iscoroutinefunction(hook):
471 await hook(**kwargs)
472 else:
473 hook(**kwargs)
474 except Exception as e:
475 debug(f"Lifecycle hook error for {event}: {e}")
477 def set_component_state(self, component_id: str, state: dict[str, Any]) -> None:
478 """Set component state."""
479 old_state = self._component_states.get(component_id, {})
480 self._component_states[component_id] = state
482 # Trigger state change hooks
483 asyncio.create_task(
484 self.execute_hooks(
485 "on_state_change",
486 component_id=component_id,
487 old_state=old_state,
488 new_state=state,
489 )
490 )
492 def get_component_state(self, component_id: str) -> dict[str, Any]:
493 """Get component state."""
494 return self._component_states.get(component_id, {})
496 def clear_component_state(self, component_id: str) -> None:
497 """Clear component state."""
498 self._component_states.pop(component_id, None)
501class AdvancedHTMYComponentRegistry:
502 """Enhanced component registry with advanced features."""
504 def __init__(
505 self,
506 searchpaths: list[AsyncPath] | None = None,
507 cache: t.Any = None,
508 storage: t.Any = None,
509 ) -> None:
510 self.searchpaths = searchpaths or []
511 self.cache = cache
512 self.storage = storage
513 self._component_cache: dict[str, t.Any] = {}
514 self._metadata_cache: dict[str, ComponentMetadata] = {}
515 self._scaffolder = ComponentScaffolder()
516 self._validator = ComponentValidator()
517 self._lifecycle_manager = ComponentLifecycleManager()
518 self._hot_reload_enabled = False
520 async def discover_components(self) -> dict[str, ComponentMetadata]:
521 """Discover all components with metadata."""
522 components = {}
524 for search_path in self.searchpaths:
525 if not await search_path.exists():
526 continue
528 async for component_file in search_path.rglob("*.py"):
529 if component_file.name == "__init__.py":
530 continue
532 component_name = component_file.stem
534 # Check cache first
535 cached_metadata = self._metadata_cache.get(component_name)
536 if cached_metadata and await self._is_cache_valid(cached_metadata):
537 components[component_name] = cached_metadata
538 continue
540 # Validate and cache metadata
541 metadata = await self._validator.validate_component_file(component_file)
542 self._metadata_cache[component_name] = metadata
543 components[component_name] = metadata
545 return components
547 async def _is_cache_valid(self, metadata: ComponentMetadata) -> bool:
548 """Check if cached metadata is still valid."""
549 try:
550 current_stat = await metadata.path.stat()
551 current_mtime = datetime.fromtimestamp(current_stat.st_mtime)
552 return metadata.last_modified == current_mtime
553 except Exception:
554 return False
556 async def get_component_class(self, component_name: str) -> t.Any:
557 """Get compiled component class with enhanced error handling."""
558 if component_name in self._component_cache:
559 return self._component_cache[component_name]
561 metadata = await self._validate_component_exists(component_name)
563 source = await metadata.path.read_text()
564 component_class = await self._load_component_from_source(source, metadata)
566 if component_class is None:
567 raise ComponentCompilationError(
568 f"No valid component class found in '{component_name}'"
569 )
571 self._component_cache[component_name] = component_class
572 metadata.status = ComponentStatus.READY
574 return component_class
576 async def _validate_component_exists(
577 self, component_name: str
578 ) -> ComponentMetadata:
579 """Validate that a component exists and return its metadata."""
580 components = await self.discover_components()
581 if component_name not in components:
582 raise ComponentValidationError(f"Component '{component_name}' not found")
584 metadata = components[component_name]
586 if metadata.status == ComponentStatus.ERROR:
587 raise ComponentCompilationError(
588 f"Component '{component_name}' has errors: {metadata.error_message}"
589 )
590 return metadata
592 async def _load_component_from_source(
593 self, source: str, metadata: ComponentMetadata
594 ) -> t.Any:
595 """Load a component class from source code."""
596 try:
597 # Import and analyze component safely
598 import importlib.util
600 # Create a temporary module to safely load the component
601 with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
602 f.write(source)
603 temp_module_path = f.name
605 try:
606 spec = importlib.util.spec_from_file_location(
607 metadata.path.stem, temp_module_path
608 )
609 if spec is None or spec.loader is None:
610 raise ComponentCompilationError(
611 f"Could not load module from {metadata.path}"
612 )
614 module = importlib.util.module_from_spec(spec)
615 spec.loader.exec_module(module)
617 component_class = None
618 for obj in vars(module).values():
619 if (
620 inspect.isclass(obj)
621 and hasattr(obj, "htmy")
622 and callable(getattr(obj, "htmy"))
623 ):
624 component_class = obj
625 break
626 finally:
627 # Clean up the temporary file
628 os.unlink(temp_module_path)
630 return component_class
631 except Exception as e:
632 metadata.status = ComponentStatus.ERROR
633 metadata.error_message = str(e)
634 raise ComponentCompilationError(
635 f"Failed to compile component '{metadata.path.stem}': {e}"
636 ) from e
638 async def render_component_with_lifecycle(
639 self, component_name: str, context: dict[str, Any], request: Request
640 ) -> str:
641 """Render component with full lifecycle management."""
642 component_id = f"{component_name}_{uuid4().hex[:8]}"
644 try:
645 # Execute before_render hooks
646 await self._lifecycle_manager.execute_hooks(
647 "before_render",
648 component_name=component_name,
649 component_id=component_id,
650 context=context,
651 request=request,
652 )
654 component_class = await self.get_component_class(component_name)
655 component_instance = component_class(**context)
657 # Enhance context with lifecycle and state management
658 enhanced_context = context | {
659 "request": request,
660 "component_id": component_id,
661 "component_state": self._lifecycle_manager.get_component_state(
662 component_id
663 ),
664 "set_state": lambda state: self._lifecycle_manager.set_component_state(
665 component_id, state
666 ),
667 "render_component": self._create_nested_renderer(request),
668 }
670 # Render component
671 if hasattr(component_instance, "async_htmy"):
672 rendered_content = await component_instance.async_htmy(enhanced_context)
673 elif asyncio.iscoroutinefunction(component_instance.htmy):
674 rendered_content = await component_instance.htmy(enhanced_context)
675 else:
676 rendered_content = component_instance.htmy(enhanced_context)
678 # Execute after_render hooks
679 await self._lifecycle_manager.execute_hooks(
680 "after_render",
681 component_name=component_name,
682 component_id=component_id,
683 rendered_content=rendered_content,
684 request=request,
685 )
687 return t.cast(str, rendered_content)
689 except Exception as e:
690 # Execute error hooks
691 await self._lifecycle_manager.execute_hooks(
692 "on_error",
693 component_name=component_name,
694 component_id=component_id,
695 error=e,
696 request=request,
697 )
698 raise ComponentRenderError(
699 f"Failed to render component '{component_name}': {e}"
700 ) from e
702 def _create_nested_renderer(
703 self, request: Request
704 ) -> t.Callable[..., t.Awaitable[str]]:
705 """Create a nested component renderer for composition."""
707 async def render_nested(
708 component_name: str, context: dict[str, Any] | None = None
709 ) -> str:
710 if context is None:
711 context = {}
712 return await self.render_component_with_lifecycle(
713 component_name, context, request
714 )
716 return render_nested
718 async def scaffold_component(
719 self,
720 name: str,
721 component_type: ComponentType = ComponentType.DATACLASS,
722 target_path: AsyncPath | None = None,
723 **kwargs: Any,
724 ) -> AsyncPath:
725 """Scaffold a new component with the specified type."""
726 if target_path is None and self.searchpaths:
727 target_path = self.searchpaths[0] / f"{name.lower()}.py"
728 elif target_path is None:
729 raise ValueError("No target path specified and no searchpaths configured")
731 # Generate component code based on type
732 if component_type == ComponentType.HTMX:
733 content = self._scaffolder.create_htmx_component(name, **kwargs)
734 elif component_type == ComponentType.COMPOSITE:
735 content = self._scaffolder.create_composite_component(name, **kwargs)
736 else:
737 content = self._scaffolder.create_basic_component(name, **kwargs)
739 # Ensure directory exists
740 await target_path.parent.mkdir(parents=True, exist_ok=True)
742 # Write component file
743 await target_path.write_text(content)
745 # Clear cache to force re-discovery
746 self.clear_cache()
748 debug(f"Scaffolded {component_type.value} component '{name}' at {target_path}")
749 return target_path
751 def clear_cache(self, component_name: str | None = None) -> None:
752 """Clear component cache."""
753 if component_name:
754 self._component_cache.pop(component_name, None)
755 self._metadata_cache.pop(component_name, None)
756 else:
757 self._component_cache.clear()
758 self._metadata_cache.clear()
760 def enable_hot_reload(self) -> None:
761 """Enable hot reloading for development."""
762 self._hot_reload_enabled = True
763 debug("HTMY component hot reload enabled")
765 def disable_hot_reload(self) -> None:
766 """Disable hot reloading."""
767 self._hot_reload_enabled = False
768 debug("HTMY component hot reload disabled")
770 @property
771 def lifecycle_manager(self) -> ComponentLifecycleManager:
772 """Access to lifecycle manager."""
773 return self._lifecycle_manager