Coverage for fastblocks / adapters / templates / htmy.py: 30%
419 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
1"""HTMY Templates Adapter for FastBlocks.
3Provides native HTMY component rendering with advanced features including:
4- Component discovery and caching system
5- Multi-layer caching (Redis, cloud storage, filesystem)
6- Bidirectional integration with Jinja2 templates
7- Async component rendering with context sharing
8- Template synchronization across cache/storage/filesystem layers
9- Enhanced debugging and error handling
11Requirements:
12- htmy>=0.1.0
13- redis>=3.5.3 (for caching)
15Usage:
16```python
17from acb.depends import Inject, depends
19htmy = depends.get("htmy")
21HTMYTemplates = import_adapter("htmy")
23response = await htmy.render_component(request, "my_component", {"data": data})
25component_class = await htmy.get_component_class("my_component")
26```
28Author: lesleslie <les@wedgwoodwebworks.com>
29Created: 2025-01-13
30"""
32import asyncio
33import typing as t
34from contextlib import suppress
35from enum import Enum
36from typing import TYPE_CHECKING, Any
37from uuid import UUID
39# Handle imports with fallback for different ACB versions
40# Import all names in a single try-except block
41imports_successful = False
42try:
43 from acb.adapters import AdapterStatus as _AdapterStatus
44 from acb.adapters import get_adapter as _get_adapter
45 from acb.adapters import import_adapter as _import_adapter
46 from acb.adapters import root_path as _root_path
48 imports_successful = True
49except ImportError:
50 _AdapterStatus = None
51 _get_adapter = None
52 _import_adapter = None
53 _root_path = None
55# Assign the imported names or fallbacks
56if imports_successful:
57 AdapterStatus = _AdapterStatus
58 get_adapter = _get_adapter
59 import_adapter = _import_adapter
60 root_path = _root_path
61else:
62 # Define fallbacks
63 class _FallbackAdapterStatus(Enum):
64 ALPHA = "alpha"
65 BETA = "beta"
66 STABLE = "stable"
67 DEPRECATED = "deprecated"
68 EXPERIMENTAL = "experimental"
70 AdapterStatus = _FallbackAdapterStatus
71 get_adapter = None
72 import_adapter = None
73 root_path = None
74from acb.debug import debug
75from acb.depends import depends
76from anyio import Path as AsyncPath
77from starlette.responses import HTMLResponse
79from ._base import TemplatesBase, TemplatesBaseSettings
80from ._htmy_components import (
81 AdvancedHTMYComponentRegistry,
82 ComponentLifecycleManager,
83 ComponentMetadata,
84 ComponentRenderError,
85 ComponentStatus,
86 ComponentType,
87)
89if TYPE_CHECKING:
90 from fastblocks.actions.sync.strategies import SyncDirection, SyncStrategy
91 from fastblocks.actions.sync.templates import sync_templates
93try:
94 from fastblocks.actions.sync.strategies import SyncDirection, SyncStrategy
95 from fastblocks.actions.sync.templates import sync_templates
96except ImportError:
97 sync_templates: t.Callable[..., t.Any] | None = None # type: ignore[no-redef]
98 SyncDirection: type[Enum] | None = None # type: ignore[no-redef]
99 SyncStrategy: type[object] | None = None # type: ignore[no-redef]
101try:
102 Cache, Storage, Models = import_adapter()
103except Exception:
104 Cache = Storage = Models = None
107class ComponentNotFound(Exception):
108 pass
111class ComponentCompilationError(Exception):
112 pass
115class HTMYComponentRegistry:
116 def __init__(
117 self,
118 searchpaths: list[AsyncPath] | None = None,
119 cache: t.Any = None,
120 storage: t.Any = None,
121 ) -> None:
122 self.searchpaths = searchpaths or []
123 self.cache = cache
124 self.storage = storage
125 self._component_cache: dict[str, t.Any] = {}
126 self._source_cache: dict[str, str] = {}
128 @staticmethod
129 def get_cache_key(component_path: AsyncPath, cache_type: str = "source") -> str:
130 return f"htmy_component_{cache_type}:{component_path}"
132 @staticmethod
133 def get_storage_path(component_path: AsyncPath) -> AsyncPath:
134 return component_path
136 async def discover_components(self) -> dict[str, AsyncPath]:
137 components = {}
138 for search_path in self.searchpaths:
139 if not await search_path.exists():
140 continue
141 async for component_file in search_path.rglob("*.py"):
142 if component_file.name == "__init__.py":
143 continue
144 component_name = component_file.stem
145 components[component_name] = component_file
147 return components
149 async def _cache_component_source(
150 self, component_path: AsyncPath, source: str
151 ) -> None:
152 if self.cache is not None:
153 cache_key = self.get_cache_key(component_path)
154 await self.cache.set(cache_key, source.encode())
156 async def _cache_component_bytecode(
157 self, component_path: AsyncPath, bytecode: bytes
158 ) -> None:
159 if self.cache is not None:
160 cache_key = self.get_cache_key(component_path, "bytecode")
161 await self.cache.set(cache_key, bytecode)
163 async def _get_cached_source(self, component_path: AsyncPath) -> str | None:
164 if self.cache is not None:
165 cache_key = self.get_cache_key(component_path)
166 cached = await self.cache.get(cache_key)
167 if cached:
168 return t.cast(str, cached.decode())
169 return None
171 async def _get_cached_bytecode(self, component_path: AsyncPath) -> bytes | None:
172 if self.cache is not None:
173 cache_key = self.get_cache_key(component_path, "bytecode")
174 result = await self.cache.get(cache_key)
175 return t.cast(Response, result)
176 return None
178 async def _sync_component_file(
179 self,
180 path: AsyncPath,
181 storage_path: AsyncPath,
182 ) -> tuple[str, int]:
183 if sync_templates is None or SyncDirection is None or SyncStrategy is None:
184 return await self._sync_from_storage_fallback(path, storage_path)
186 try:
187 strategy = SyncStrategy(backup_on_conflict=False)
188 component_paths = [path]
189 result = await sync_templates(
190 template_paths=component_paths,
191 strategy=strategy,
192 )
194 source = await path.read_text()
195 local_stat = await path.stat()
196 local_mtime = int(local_stat.st_mtime)
198 debug(f"Component sync result: {result.sync_status} for {path}")
199 return source, local_mtime
201 except Exception as e:
202 debug(f"Sync action failed for {path}: {e}, falling back to primitive sync")
203 return await self._sync_from_storage_fallback(path, storage_path)
205 async def _sync_from_storage_fallback(
206 self,
207 path: AsyncPath,
208 storage_path: AsyncPath,
209 ) -> tuple[str, int]:
210 local_stat = await path.stat()
211 local_mtime = int(local_stat.st_mtime)
213 if self.storage is not None:
214 try:
215 local_size = local_stat.st_size
216 storage_stat = await self.storage.templates.stat(storage_path)
217 storage_mtime = round(storage_stat.get("mtime", 0))
218 storage_size = storage_stat.get("size", 0)
220 if local_mtime < storage_mtime and local_size != storage_size:
221 resp = await self.storage.templates.open(storage_path)
222 await path.write_bytes(resp)
223 source = resp.decode()
224 return source, storage_mtime
225 except Exception as e:
226 debug(f"Storage fallback failed for {path}: {e}")
228 source = await path.read_text()
229 return source, local_mtime
231 async def get_component_source(self, component_name: str) -> tuple[str, AsyncPath]:
232 components = await self.discover_components()
233 if component_name not in components:
234 raise ComponentNotFound(f"Component '{component_name}' not found")
235 component_path = components[component_name]
236 cache_key = str(component_path)
237 if cache_key in self._source_cache:
238 return self._source_cache[cache_key], component_path
239 cached_source = await self._get_cached_source(component_path)
240 if cached_source:
241 self._source_cache[cache_key] = cached_source
242 return cached_source, component_path
243 storage_path = self.get_storage_path(component_path)
244 source, _ = await self._sync_component_file(component_path, storage_path)
245 self._source_cache[cache_key] = source
246 await self._cache_component_source(component_path, source)
248 return source, component_path
250 async def get_component_class(self, component_name: str) -> t.Any:
251 if component_name in self._component_cache:
252 return self._component_cache[component_name]
254 source, component_path = await self.get_component_source(component_name)
255 cached_bytecode = await self._get_cached_bytecode(component_path)
257 # Try to load from cached bytecode first
258 if cached_bytecode:
259 component_class = await self._load_from_cached_bytecode(
260 cached_bytecode, source, component_path, component_name
261 )
262 if component_class:
263 return component_class
265 # Otherwise, load from source
266 return await self._load_from_source(source, component_path, component_name)
268 async def _load_from_cached_bytecode(
269 self, cached_bytecode, source, component_path, component_name
270 ):
271 """Attempt to load component class from cached bytecode."""
272 try:
273 # Instead of using pickle, we'll compile the source directly
274 # Pickle is a security risk as mentioned in the semgrep error
275 compile(source, str(component_path), "exec")
276 # Create a module-like namespace to execute the compiled code
277 import importlib.util
278 import os
279 import tempfile
281 # Create a temporary module to safely load the component
282 with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
283 f.write(source)
284 temp_module_path = f.name
286 try:
287 spec = importlib.util.spec_from_file_location(
288 component_path.stem, temp_module_path
289 )
290 if spec is None or spec.loader is None:
291 raise ComponentCompilationError(
292 f"Could not load module from {component_path}"
293 )
295 module = importlib.util.module_from_spec(spec)
296 spec.loader.exec_module(module)
298 component_class = None
299 for obj in vars(module).values():
300 if hasattr(obj, "htmy") and callable(getattr(obj, "htmy")):
301 component_class = obj
302 break
303 finally:
304 # Clean up the temporary file
305 os.unlink(temp_module_path)
307 # Cache the compiled form instead of pickle-able bytecode
308 self._component_cache[component_name] = component_class
309 return component_class
310 except Exception as e:
311 debug(f"Failed to load cached bytecode for {component_name}: {e}")
312 return None
314 async def _load_from_source(self, source, component_path, component_name):
315 """Load component class from source file."""
316 try:
317 # Import and analyze component safely
318 import importlib.util
319 import os
320 import tempfile
322 # Create a temporary module to safely load the component
323 with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
324 f.write(source)
325 temp_module_path = f.name
327 try:
328 spec = importlib.util.spec_from_file_location(
329 component_path.stem, temp_module_path
330 )
331 if spec is None or spec.loader is None:
332 raise ComponentCompilationError(
333 f"Could not load module from {component_path}"
334 )
336 module = importlib.util.module_from_spec(spec)
337 spec.loader.exec_module(module)
339 component_class = None
340 for obj in vars(module).values():
341 if hasattr(obj, "htmy") and callable(getattr(obj, "htmy")):
342 component_class = obj
343 break
344 finally:
345 # Clean up the temporary file
346 os.unlink(temp_module_path)
348 self._component_cache[component_name] = component_class
349 return component_class
350 except Exception as e:
351 raise ComponentCompilationError(
352 f"Failed to compile component '{component_name}': {e}"
353 ) from e
356class HTMYTemplatesSettings(TemplatesBaseSettings):
357 searchpaths: list[str] = []
358 cache_timeout: int = 300
359 enable_bidirectional: bool = True
360 debug_components: bool = False
361 enable_hot_reload: bool = True
362 enable_lifecycle_hooks: bool = True
363 enable_component_validation: bool = True
364 enable_advanced_registry: bool = True
367class HTMYTemplates(TemplatesBase):
368 def __init__(self, **kwargs: t.Any) -> None:
369 super().__init__(**kwargs)
370 self.htmy_registry: HTMYComponentRegistry | None = None
371 self.advanced_registry: AdvancedHTMYComponentRegistry | None = None
372 self.component_searchpaths: list[AsyncPath] = []
373 self.jinja_templates: t.Any = None
374 self.settings = HTMYTemplatesSettings(**kwargs)
376 async def get_component_searchpaths(self, app_adapter: t.Any) -> list[AsyncPath]:
377 searchpaths = []
378 if callable(root_path):
379 base_root = AsyncPath(root_path())
380 else:
381 base_root = AsyncPath(root_path)
382 debug(f"get_component_searchpaths: app_adapter={app_adapter}")
383 if app_adapter:
384 category = getattr(app_adapter, "category", "app")
385 debug(f"get_component_searchpaths: using category={category}")
386 template_paths = self.get_searchpath(
387 app_adapter, base_root / "templates" / category
388 )
389 debug(f"get_component_searchpaths: template_paths={template_paths}")
390 for template_path in template_paths:
391 component_path = template_path / "components"
392 searchpaths.append(component_path)
393 debug(
394 f"get_component_searchpaths: added component_path={component_path}"
395 )
396 debug(f"get_component_searchpaths: final searchpaths={searchpaths}")
397 return searchpaths
399 async def _init_htmy_registry(self) -> None:
400 if self.htmy_registry is not None and self.advanced_registry is not None:
401 return
403 app_adapter = get_adapter("app")
404 if app_adapter is None:
405 try:
406 app_adapter = depends.get("app")
407 except Exception:
408 from types import SimpleNamespace
410 app_adapter = SimpleNamespace(name="app", category="app")
412 self.component_searchpaths = await self.get_component_searchpaths(app_adapter)
414 # Initialize advanced registry if enabled
415 if self.settings.enable_advanced_registry:
416 self.advanced_registry = AdvancedHTMYComponentRegistry(
417 searchpaths=self.component_searchpaths,
418 cache=self.cache,
419 storage=self.storage,
420 )
422 # Configure hot reload
423 if self.settings.enable_hot_reload:
424 self.advanced_registry.enable_hot_reload()
426 # Keep legacy registry for backward compatibility
427 self.htmy_registry = HTMYComponentRegistry(
428 searchpaths=self.component_searchpaths,
429 cache=self.cache,
430 storage=self.storage,
431 )
433 async def clear_component_cache(self, component_name: str | None = None) -> None:
434 if self.htmy_registry is None:
435 return
436 if component_name:
437 self.htmy_registry._component_cache.pop(component_name, None)
438 if self.cache:
439 components = await self.htmy_registry.discover_components()
440 if component_name in components:
441 component_path = components[component_name]
442 source_key = HTMYComponentRegistry.get_cache_key(component_path)
443 bytecode_key = HTMYComponentRegistry.get_cache_key(
444 component_path, "bytecode"
445 )
446 await self.cache.delete(source_key)
447 await self.cache.delete(bytecode_key)
448 debug(f"HTMY component cache cleared for: {component_name}")
449 else:
450 self.htmy_registry._component_cache.clear()
451 self.htmy_registry._source_cache.clear()
452 if self.cache:
453 with suppress(NotImplementedError, AttributeError):
454 await self.cache.clear("htmy_component_source")
455 await self.cache.clear("htmy_component_bytecode")
456 debug("All HTMY component caches cleared")
458 async def get_component_class(self, component_name: str) -> t.Any:
459 if self.htmy_registry is None:
460 await self._init_htmy_registry()
462 if self.htmy_registry is not None:
463 return await self.htmy_registry.get_component_class(component_name)
464 raise ComponentNotFound(
465 f"Component registry not initialized for '{component_name}'"
466 )
468 async def render_component_advanced(
469 self,
470 request: t.Any,
471 component: str,
472 context: dict[str, t.Any] | None = None,
473 status_code: int = 200,
474 headers: dict[str, str] | None = None,
475 **kwargs: t.Any,
476 ) -> HTMLResponse:
477 """Render component using advanced registry with lifecycle management."""
478 if context is None:
479 context = {}
480 if headers is None:
481 headers = {}
483 if self.advanced_registry is None:
484 await self._init_htmy_registry()
486 if self.advanced_registry is None:
487 raise ComponentRenderError(
488 f"Advanced registry not initialized for '{component}'"
489 )
491 try:
492 # Add kwargs to context
493 enhanced_context = context | kwargs
495 rendered_content = (
496 await self.advanced_registry.render_component_with_lifecycle(
497 component, enhanced_context, request
498 )
499 )
501 return HTMLResponse(
502 content=rendered_content,
503 status_code=status_code,
504 headers=headers,
505 )
507 except Exception as e:
508 error_content = (
509 f"<html><body>Component {component} error: {e}</body></html>"
510 )
511 if self.settings.debug_components:
512 import traceback
514 error_content = f"<html><body><h3>Component {component} error:</h3><pre>{traceback.format_exc()}</pre></body></html>"
516 return HTMLResponse(
517 content=error_content,
518 status_code=500,
519 headers=headers,
520 )
522 async def render_component(
523 self,
524 request: t.Any,
525 component: str,
526 context: dict[str, t.Any] | None = None,
527 status_code: int = 200,
528 headers: dict[str, str] | None = None,
529 **kwargs: t.Any,
530 ) -> HTMLResponse:
531 if context is None:
532 context = {}
533 if headers is None:
534 headers = {}
536 # Use advanced registry if available and enabled
537 if (
538 self.settings.enable_advanced_registry
539 and self.advanced_registry is not None
540 ):
541 return await self.render_component_advanced(
542 request, component, context, status_code, headers, **kwargs
543 )
545 if self.htmy_registry is None:
546 await self._init_htmy_registry()
548 if self.htmy_registry is None:
549 raise ComponentNotFound(
550 f"Component registry not initialized for '{component}'"
551 )
553 try:
554 component_class = await self.htmy_registry.get_component_class(component)
556 component_instance = component_class(**context, **kwargs)
558 htmy_context = {
559 "request": request,
560 **context,
561 "render_template": self._create_template_renderer(request),
562 "render_block": self._create_block_renderer(request),
563 "_template_system": "htmy",
564 "_request": request,
565 }
567 if asyncio.iscoroutinefunction(component_instance.htmy):
568 rendered_content = await component_instance.htmy(htmy_context)
569 else:
570 rendered_content = component_instance.htmy(htmy_context)
572 html_content = str(rendered_content)
574 return HTMLResponse(
575 content=html_content,
576 status_code=status_code,
577 headers=headers,
578 )
580 except (ComponentNotFound, ComponentCompilationError) as e:
581 return HTMLResponse(
582 content=f"<html><body>Component {component} error: {e}</body></html>",
583 status_code=404,
584 headers=headers,
585 )
587 def _create_template_renderer(
588 self, request: t.Any = None
589 ) -> t.Callable[..., t.Any]:
590 async def render_template(
591 template_name: str,
592 context: dict[str, t.Any] | None = None,
593 inherit_context: bool = True, # noqa: ARG001
594 **kwargs: t.Any,
595 ) -> str:
596 if context is None:
597 context = {}
599 template_context = context | kwargs
601 if self.jinja_templates and hasattr(self.jinja_templates, "app"):
602 try:
603 template = self.jinja_templates.app.get_template(template_name)
604 if asyncio.iscoroutinefunction(template.render):
605 rendered = await template.render(template_context)
606 else:
607 rendered = template.render(template_context)
608 return t.cast(str, rendered)
609 except Exception as e:
610 debug(
611 f"Failed to render template '{template_name}' in HTMY component: {e}"
612 )
613 return f"<!-- Error rendering template '{template_name}': {e} -->"
614 else:
615 debug(
616 f"No Jinja2 adapter available to render template '{template_name}' in HTMY component"
617 )
618 return f"<!-- No template renderer available for '{template_name}' -->"
620 return render_template
622 async def discover_components(self) -> dict[str, ComponentMetadata]:
623 """Discover all components and return metadata."""
624 if self.advanced_registry is None:
625 await self._init_htmy_registry()
627 if self.advanced_registry is not None:
628 return await self.advanced_registry.discover_components()
630 # Fallback to basic discovery
631 components = {}
632 if self.htmy_registry is not None:
633 discovered = await self.htmy_registry.discover_components()
634 for name, path in discovered.items():
635 components[name] = ComponentMetadata(
636 name=name,
637 path=path,
638 type=ComponentType.BASIC,
639 status=ComponentStatus.DISCOVERED,
640 )
641 return components
643 async def scaffold_component(
644 self,
645 name: str,
646 component_type: ComponentType = ComponentType.DATACLASS,
647 props: dict[str, type] | None = None,
648 htmx_enabled: bool = False,
649 endpoint: str = "",
650 trigger: str = "click",
651 target: str = "#content",
652 children: list[str] | None = None,
653 target_path: AsyncPath | None = None,
654 ) -> AsyncPath:
655 """Scaffold a new component."""
656 if self.advanced_registry is None:
657 await self._init_htmy_registry()
659 if self.advanced_registry is None:
660 raise ComponentRenderError(
661 "Advanced registry not available for scaffolding"
662 )
664 kwargs: dict[str, Any] = {}
665 if props:
666 kwargs["props"] = props
667 if htmx_enabled:
668 kwargs["htmx_enabled"] = True
669 if endpoint:
670 kwargs["endpoint"] = endpoint
671 kwargs["trigger"] = trigger
672 kwargs["target"] = target
673 if children:
674 kwargs["children"] = children
676 return await self.advanced_registry.scaffold_component(
677 name, component_type, target_path, **kwargs
678 )
680 async def validate_component(self, component_name: str) -> ComponentMetadata:
681 """Validate a specific component."""
682 components = await self.discover_components()
683 if component_name not in components:
684 raise ComponentNotFound(f"Component '{component_name}' not found")
686 return components[component_name]
688 def get_lifecycle_manager(self) -> ComponentLifecycleManager | None:
689 """Get the component lifecycle manager."""
690 if self.advanced_registry is not None:
691 return self.advanced_registry.lifecycle_manager
692 return None
694 def register_lifecycle_hook(
695 self, event: str, callback: t.Callable[..., Any]
696 ) -> None:
697 """Register a lifecycle hook."""
698 lifecycle_manager = self.get_lifecycle_manager()
699 if lifecycle_manager is not None:
700 lifecycle_manager.register_hook(event, callback)
702 def _create_block_renderer(self, request: t.Any = None) -> t.Callable[..., t.Any]:
703 async def render_block(
704 block_name: str, context: dict[str, t.Any] | None = None, **kwargs: t.Any
705 ) -> str:
706 if context is None:
707 context = {}
709 block_context = context | kwargs
711 if (
712 self.jinja_templates
713 and hasattr(self.jinja_templates, "app")
714 and hasattr(self.jinja_templates.app, "render_block")
715 ):
716 try:
717 if asyncio.iscoroutinefunction(
718 self.jinja_templates.app.render_block
719 ):
720 rendered = await self.jinja_templates.app.render_block(
721 block_name, block_context
722 )
723 else:
724 rendered = self.jinja_templates.app.render_block(
725 block_name, block_context
726 )
727 return t.cast(str, rendered)
728 except Exception as e:
729 debug(
730 f"Failed to render block '{block_name}' in HTMY component: {e}"
731 )
732 return f"<!-- Error rendering block '{block_name}': {e} -->"
733 else:
734 debug(
735 f"No block renderer available for '{block_name}' in HTMY component"
736 )
737 return f"<!-- No block renderer available for '{block_name}' -->"
739 return render_block
741 async def init(self, cache: t.Any | None = None) -> None:
742 if cache is None:
743 try:
744 cache = depends.get("cache")
745 except Exception:
746 cache = None
747 self.cache = cache
748 try:
749 self.storage = depends.get("storage")
750 except Exception:
751 self.storage = None
752 await self._init_htmy_registry()
753 try:
754 self.jinja_templates = depends.get("templates")
755 except Exception:
756 self.jinja_templates = None
757 depends.set("htmy", self)
758 debug("HTMY Templates adapter initialized")
760 async def render_template(
761 self,
762 request: t.Any,
763 template: str,
764 context: dict[str, t.Any] | None = None,
765 status_code: int = 200,
766 headers: dict[str, str] | None = None,
767 ) -> HTMLResponse:
768 return await self.render_component(
769 request=request,
770 component=template,
771 context=context,
772 status_code=status_code,
773 headers=headers,
774 )
777MODULE_ID = UUID("01937d86-e1f2-7890-abcd-ef1234567890")
778MODULE_STATUS = AdapterStatus.STABLE if AdapterStatus is not None else None
780TemplatesSettings = HTMYTemplatesSettings
781Templates = HTMYTemplates
783with suppress(Exception):
784 depends.set(Templates, "htmy")
786__all__ = ["Templates", "TemplatesSettings", "HTMYTemplatesSettings", "HTMYTemplates"]