Coverage for fastblocks / adapters / templates / jinja2.py: 53%
599 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
1"""Jinja2 Templates Adapter for FastBlocks.
3Provides asynchronous Jinja2 template rendering with advanced features including:
4- Multi-layer loader system (Redis cache, cloud storage, filesystem)
5- Bidirectional integration with HTMY components via dedicated HTMY adapter
6- Fragment and partial template support
7- Bytecode caching with Redis backend
8- Template synchronization across cache/storage/filesystem layers
9- Custom delimiters ([[/]] instead of {{/}})
10- Extensive template debugging and error handling
12Requirements:
13- jinja2-async-environment>=0.14.3
14- starlette-async-jinja>=1.12.4
15- jinja2>=3.1.6
16- redis>=3.5.3 (for caching)
18Usage:
19```python
20from acb.depends import Inject, depends
21from acb.adapters import import_adapter
23templates = depends.get("templates")
25Templates = import_adapter("templates")
27response = await templates.render_template(
28 request, "index.html", {"title": "FastBlocks"}
29)
30```
32Author: lesleslie <les@wedgwoodwebworks.com>
33Created: 2025-01-12
34"""
36import asyncio
37import re
38import typing as t
39from ast import literal_eval
40from contextlib import suppress
41from html.parser import HTMLParser
42from importlib import import_module
43from importlib.util import find_spec
44from inspect import isclass
45from pathlib import Path
46from uuid import UUID
48from acb.adapters import AdapterStatus, get_adapter
49from acb.config import Config
50from acb.debug import debug
52# Import event tracking decorator (with fallback if unavailable)
53try:
54 from ._events_wrapper import track_template_render
55except ImportError:
56 # Fallback no-op decorator if events integration unavailable
57 def track_template_render(func: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]:
58 return func
61from acb.depends import depends
62from anyio import Path as AsyncPath
63from jinja2 import TemplateNotFound
64from jinja2.ext import Extension, i18n, loopcontrols
65from jinja2.ext import debug as jinja_debug
66from jinja2_async_environment.bccache import AsyncRedisBytecodeCache
67from jinja2_async_environment.loaders import AsyncBaseLoader, SourceType
68from starlette_async_jinja import AsyncJinja2Templates
69from fastblocks.actions.sync.strategies import SyncDirection, SyncStrategy
70from fastblocks.actions.sync.templates import sync_templates
72from ._base import TemplatesBase, TemplatesBaseSettings
74Cache, Storage, Models = None, None, None
76_TEMPLATE_REPLACEMENTS = [
77 (b"{{", b"[["),
78 (b"}}", b"]]"),
79 (b"{%", b"[%"),
80 (b"%}", b"%]"),
81]
82_HTTP_TO_HTTPS = (b"http://", b"https://")
84_ATTR_PATTERN_CACHE: dict[str, re.Pattern[str]] = {}
87def _get_attr_pattern(attr: str) -> re.Pattern[str]:
88 if attr not in _ATTR_PATTERN_CACHE:
89 escaped_attr = re.escape(f"{attr}=")
90 _ATTR_PATTERN_CACHE[attr] = re.compile(
91 escaped_attr
92 ) # REGEX OK: Template attribute pattern compilation for Jinja2
93 return _ATTR_PATTERN_CACHE[attr]
96def _apply_template_replacements(source: bytes, deployed: bool = False) -> bytes:
97 for old_pattern, new_pattern in _TEMPLATE_REPLACEMENTS:
98 source = source.replace(old_pattern, new_pattern)
99 if deployed:
100 source = source.replace(*_HTTP_TO_HTTPS)
102 return source
105class BaseTemplateLoader(AsyncBaseLoader):
106 config: t.Any = None
107 cache: t.Any = None
108 storage: t.Any = None
110 def __init__(
111 self,
112 searchpath: AsyncPath | t.Sequence[AsyncPath] | None = None,
113 ) -> None:
114 super().__init__(searchpath or [])
115 if self.storage is None:
116 try:
117 self.storage = depends.get("storage")
118 except Exception:
119 self.storage = get_adapter("storage")
120 if self.cache is None:
121 try:
122 self.cache = depends.get("cache")
123 except Exception:
124 self.cache = get_adapter("cache")
125 if not hasattr(self, "config"):
126 try:
127 self.config = depends.get("config")
128 except Exception:
129 config_adapter = get_adapter("config")
130 self.config = (
131 config_adapter if isinstance(config_adapter, Config) else Config()
132 )
134 def get_supported_extensions(self) -> tuple[str, ...]:
135 return ("html", "css", "js")
137 async def _list_templates_for_extensions(
138 self,
139 extensions: tuple[str, ...],
140 ) -> list[str]:
141 found: set[str] = set()
142 for searchpath in self.searchpath:
143 for ext in extensions:
144 async for p in searchpath.rglob(f"*.{ext}"):
145 found.add(str(p))
146 return sorted(found)
148 def _normalize_template(
149 self,
150 environment_or_template: t.Any,
151 template: str | AsyncPath | None = None,
152 ) -> str | AsyncPath:
153 if template is None:
154 template = environment_or_template
155 assert template is not None
156 return template
158 async def _find_template_path_parallel(
159 self,
160 template: str | AsyncPath,
161 ) -> AsyncPath | None:
162 async def check_path(searchpath: AsyncPath) -> AsyncPath | None:
163 path = searchpath / template
164 if await path.is_file():
165 return path
166 return None
168 tasks = [check_path(searchpath) for searchpath in self.searchpath]
169 results = await asyncio.gather(*tasks, return_exceptions=True)
171 for result in results:
172 if isinstance(result, AsyncPath):
173 return result
174 return None
176 async def _find_storage_path_parallel(
177 self,
178 template: str | AsyncPath,
179 ) -> tuple[AsyncPath, AsyncPath] | None:
180 async def check_storage_path(
181 searchpath: AsyncPath,
182 ) -> tuple[AsyncPath, AsyncPath] | None:
183 path = searchpath / template
184 storage_path = Templates.get_storage_path(path)
185 if storage_path and await self.storage.templates.exists(storage_path):
186 return path, storage_path
187 return None
189 tasks = [check_storage_path(searchpath) for searchpath in self.searchpath]
190 results = await asyncio.gather(*tasks, return_exceptions=True)
192 for result in results:
193 if isinstance(result, tuple):
194 return result
195 return None
197 async def _find_cache_path_parallel(
198 self,
199 template: str | AsyncPath,
200 ) -> tuple[AsyncPath, AsyncPath, str] | None:
201 async def check_cache_path(
202 searchpath: AsyncPath,
203 ) -> tuple[AsyncPath, AsyncPath, str] | None:
204 path = searchpath / template if searchpath else AsyncPath(template)
205 storage_path = Templates.get_storage_path(path)
206 cache_key = Templates.get_cache_key(storage_path)
207 if (
208 storage_path
209 and self.cache is not None
210 and await self.cache.exists(cache_key)
211 ):
212 return path, storage_path, cache_key
213 return None
215 tasks = [check_cache_path(searchpath) for searchpath in self.searchpath]
216 results = await asyncio.gather(*tasks, return_exceptions=True)
218 for result in results:
219 if isinstance(result, tuple):
220 return result
221 return None
224class LoaderProtocol(t.Protocol):
225 cache: t.Any
226 config: t.Any
227 storage: t.Any
229 async def get_source_async(
230 self,
231 environment_or_template: t.Any,
232 template: str | AsyncPath | None = None,
233 ) -> tuple[
234 str,
235 str | None,
236 t.Callable[[], bool] | t.Callable[[], t.Awaitable[bool]],
237 ]: ...
239 async def list_templates_async(self) -> list[str]: ...
242class FileSystemLoader(BaseTemplateLoader):
243 async def _check_storage_exists(self, storage_path: AsyncPath) -> bool:
244 if self.storage is not None:
245 return t.cast(bool, await self.storage.templates.exists(storage_path))
246 return False
248 async def _sync_template_file(
249 self,
250 path: AsyncPath,
251 storage_path: AsyncPath,
252 ) -> tuple[bytes, int]:
253 if sync_templates is None or SyncDirection is None or SyncStrategy is None:
254 return await self._sync_from_storage_fallback(path, storage_path)
256 try:
257 strategy = SyncStrategy(
258 backup_on_conflict=False,
259 )
261 template_paths = [path]
262 result = await sync_templates(
263 template_paths=template_paths,
264 strategy=strategy,
265 )
267 resp = await path.read_bytes()
268 local_stat = await path.stat()
269 local_mtime = int(local_stat.st_mtime)
271 debug(f"Template sync result: {result.sync_status} for {path}")
272 return resp, local_mtime
274 except Exception as e:
275 debug(f"Sync action failed for {path}: {e}, falling back to primitive sync")
276 return await self._sync_from_storage_fallback(path, storage_path)
278 async def _sync_from_storage_fallback(
279 self,
280 path: AsyncPath,
281 storage_path: AsyncPath,
282 ) -> tuple[bytes, int]:
283 local_stat = await path.stat()
284 local_mtime = int(local_stat.st_mtime)
285 local_size = local_stat.st_size
286 storage_stat = await self.storage.templates.stat(storage_path)
287 storage_mtime = round(storage_stat.get("mtime"))
288 storage_size = storage_stat.get("size")
290 if local_mtime < storage_mtime and local_size != storage_size:
291 resp = await self.storage.templates.open(storage_path)
292 await path.write_bytes(resp)
293 else:
294 resp = await path.read_bytes()
295 if local_size != storage_size:
296 await self.storage.templates.write(storage_path, resp)
297 return resp, local_mtime
299 async def _read_and_store_template(
300 self,
301 path: AsyncPath,
302 storage_path: AsyncPath,
303 ) -> bytes:
304 try:
305 resp = await path.read_bytes()
306 if self.storage is not None:
307 try:
308 import asyncio
310 await asyncio.wait_for(
311 self.storage.templates.write(storage_path, resp), timeout=5.0
312 )
313 except (TimeoutError, Exception) as e:
314 debug(
315 f"Storage write failed for {storage_path}: {e}, continuing with local file"
316 )
317 return resp
318 except FileNotFoundError:
319 raise TemplateNotFound(path.name)
321 async def _cache_template(self, storage_path: AsyncPath, resp: bytes) -> None:
322 if self.cache is not None:
323 await self.cache.set(Templates.get_cache_key(storage_path), resp)
325 async def get_source_async(
326 self,
327 environment_or_template: t.Any,
328 template: str | AsyncPath | None = None,
329 ) -> SourceType:
330 template = self._normalize_template(environment_or_template, template)
331 path = await self._find_template_path_parallel(template)
332 if path is None:
333 raise TemplateNotFound(str(template))
334 storage_path = Templates.get_storage_path(path)
335 debug(path)
337 fs_exists = await path.exists()
338 storage_exists = await self._check_storage_exists(storage_path)
339 local_mtime = 0
341 if storage_exists and fs_exists and (not self.config.deployed):
342 resp, local_mtime = await self._sync_template_file(path, storage_path)
343 else:
344 resp = await self._read_and_store_template(path, storage_path)
346 await self._cache_template(storage_path, resp)
348 async def uptodate() -> bool:
349 return int((await path.stat()).st_mtime) == local_mtime
351 return (resp.decode(), str(storage_path), uptodate)
353 async def list_templates_async(self) -> list[str]:
354 return await self._list_templates_for_extensions(
355 self.get_supported_extensions(),
356 )
359class StorageLoader(BaseTemplateLoader):
360 async def _check_filesystem_sync_opportunity(
361 self, template: str | AsyncPath, storage_path: AsyncPath
362 ) -> AsyncPath | None:
363 if not self.config or self.config.deployed:
364 return None
366 fs_result = await self._find_template_path_parallel(template)
367 if fs_result and await fs_result.exists():
368 return fs_result
369 return None
371 async def _sync_storage_with_filesystem(
372 self,
373 fs_path: AsyncPath,
374 storage_path: AsyncPath,
375 ) -> tuple[bytes, int]:
376 if sync_templates is None or SyncDirection is None or SyncStrategy is None:
377 resp = await self.storage.templates.open(storage_path)
378 stat = await self.storage.templates.stat(storage_path)
379 return resp, round(stat.get("mtime").timestamp())
381 try:
382 strategy = SyncStrategy(
383 direction=SyncDirection.PULL,
384 backup_on_conflict=False,
385 )
387 result = await sync_templates(
388 template_paths=[fs_path],
389 strategy=strategy,
390 )
392 resp = await fs_path.read_bytes()
393 local_stat = await fs_path.stat()
394 local_mtime = int(local_stat.st_mtime)
396 debug(
397 f"Storage-filesystem sync result: {result.sync_status} for {storage_path}"
398 )
399 return resp, local_mtime
401 except Exception as e:
402 debug(
403 f"Storage sync failed for {storage_path}: {e}, reading from storage only"
404 )
405 resp = await self.storage.templates.open(storage_path)
406 stat = await self.storage.templates.stat(storage_path)
407 return resp, round(stat.get("mtime").timestamp())
409 async def get_source_async(
410 self,
411 environment_or_template: t.Any,
412 template: str | AsyncPath | None = None,
413 ) -> tuple[str, str, t.Callable[[], t.Awaitable[bool]]]:
414 template = self._normalize_template(environment_or_template, template)
415 result = await self._find_storage_path_parallel(template)
416 if result is None:
417 raise TemplateNotFound(str(template))
418 _, storage_path = result
419 debug(storage_path)
421 try:
422 fs_path = await self._check_filesystem_sync_opportunity(
423 template, storage_path
424 )
426 if fs_path:
427 resp, local_mtime = await self._sync_storage_with_filesystem(
428 fs_path, storage_path
429 )
430 else:
431 resp = await self.storage.templates.open(storage_path)
432 local_stat = await self.storage.templates.stat(storage_path)
433 local_mtime = round(local_stat.get("mtime").timestamp())
435 if self.cache is not None:
436 await self.cache.set(Templates.get_cache_key(storage_path), resp)
438 async def uptodate() -> bool:
439 if fs_path and await fs_path.exists():
440 fs_stat = await fs_path.stat()
441 return int(fs_stat.st_mtime) == local_mtime
442 else:
443 storage_stat = await self.storage.templates.stat(storage_path)
444 mtime = storage_stat.get("mtime")
445 if hasattr(mtime, "timestamp"):
446 timestamp = t.cast(float, mtime.timestamp())
447 return round(timestamp) == local_mtime
448 return False
450 return (resp.decode(), str(storage_path), uptodate)
451 except (FileNotFoundError, AttributeError):
452 raise TemplateNotFound(str(template))
454 async def list_templates_async(self) -> list[str]:
455 found: list[str] = []
456 for searchpath in self.searchpath:
457 with suppress(FileNotFoundError):
458 paths = await self.storage.templates.list(
459 Templates.get_storage_path(searchpath),
460 )
461 found.extend(p for p in paths if p.endswith((".html", ".css", ".js")))
462 found.sort()
463 return found
466class RedisLoader(BaseTemplateLoader):
467 async def get_source_async(
468 self,
469 environment_or_template: t.Any,
470 template: str | AsyncPath | None = None,
471 ) -> tuple[str, str | None, t.Callable[[], t.Awaitable[bool]]]:
472 template = self._normalize_template(environment_or_template, template)
473 result = await self._find_cache_path_parallel(template)
474 if result is None:
475 raise TemplateNotFound(str(template))
476 path, _, cache_key = result
477 debug(cache_key)
478 resp = await self.cache.get(cache_key) if self.cache is not None else None
479 if not resp:
480 raise TemplateNotFound(path.name)
482 async def uptodate() -> bool:
483 return True
485 return (resp.decode(), None, uptodate)
487 async def list_templates_async(self) -> list[str]:
488 found: list[str] = []
489 for ext in ("html", "css", "js"):
490 scan_result: t.Any = (
491 await self.cache.scan(f"*.{ext}") if self.cache is not None else []
492 )
493 if hasattr(scan_result, "__aiter__"):
494 async for k in scan_result:
495 found.append(k)
496 else:
497 found.extend(scan_result)
498 found.sort()
499 return found
502class PackageLoader(BaseTemplateLoader):
503 _template_root: AsyncPath
504 _adapter: str
505 package_name: str
506 _loader: t.Any
508 def __init__(
509 self,
510 package_name: str,
511 path: str = "templates",
512 adapter: str = "admin",
513 ) -> None:
514 self.package_path = Path(package_name)
515 self.path = self.package_path / path
516 super().__init__(AsyncPath(self.path))
517 self.package_name = package_name
518 self._adapter = adapter
519 self._template_root = AsyncPath(".")
520 try:
521 if package_name.startswith("/"):
522 spec = None
523 self._loader = None
524 return
525 import_module(package_name)
526 spec = find_spec(package_name)
527 if spec is None:
528 msg = f"Could not find package {package_name}"
529 raise ImportError(msg)
530 except ModuleNotFoundError:
531 spec = None
532 self._loader = None
533 return
534 roots: list[Path] = []
535 template_root = None
536 loader = spec.loader
537 self._loader = loader
538 if spec.submodule_search_locations:
539 roots.extend(Path(s) for s in spec.submodule_search_locations)
540 elif spec.origin is not None:
541 roots.append(Path(spec.origin))
542 for root in roots:
543 root = root / path
544 if root.is_dir():
545 template_root = root
546 break
547 if template_root is None:
548 msg = f"The {package_name!r} package was not installed in a way that PackageLoader understands."
549 raise ValueError(
550 msg,
551 )
552 self._template_root = AsyncPath(template_root)
554 async def get_source_async(
555 self,
556 environment_or_template: t.Any,
557 template: str | AsyncPath | None = None,
558 ) -> tuple[str, str, t.Callable[[], t.Awaitable[bool]]]:
559 if template is None:
560 template = environment_or_template
561 assert template is not None
562 template_path: AsyncPath = AsyncPath(template)
563 path = self._template_root / template_path
564 debug(path)
565 if not await path.is_file():
566 raise TemplateNotFound(template_path.name)
567 source = await path.read_bytes()
568 mtime = (await path.stat()).st_mtime
570 async def uptodate() -> bool:
571 return await path.is_file() and (await path.stat()).st_mtime == mtime
573 source = _apply_template_replacements(source, self.config.deployed)
574 storage_path = Templates.get_storage_path(path)
575 _storage_path: list[str] = list(storage_path.parts)
576 _storage_path[0] = "_templates"
577 _storage_path.insert(1, self._adapter)
578 _storage_path.insert(2, getattr(self.config, self._adapter).style)
579 storage_path = AsyncPath("/".join(_storage_path))
580 cache_key = Templates.get_cache_key(storage_path)
581 if self.cache is not None:
582 await self.cache.set(cache_key, source)
583 return (source.decode(), path.name, uptodate)
585 async def list_templates_async(self) -> list[str]:
586 found: set[str] = set()
587 for ext in ("html", "css", "js"):
588 found.update([str(p) async for p in self._template_root.rglob(f"*.{ext}")])
589 return sorted(found)
592class ChoiceLoader(AsyncBaseLoader):
593 loaders: list[AsyncBaseLoader | LoaderProtocol]
595 def __init__(
596 self,
597 loaders: list[AsyncBaseLoader | LoaderProtocol],
598 searchpath: AsyncPath | t.Sequence[AsyncPath] | None = None,
599 ) -> None:
600 super().__init__(searchpath or AsyncPath("templates"))
601 self.loaders = loaders
603 async def get_source_async(
604 self,
605 environment_or_template: t.Any,
606 template: str | AsyncPath | None = None,
607 ) -> SourceType:
608 if template is None:
609 template = environment_or_template
610 assert template is not None
611 for loader in self.loaders:
612 try:
613 result = await loader.get_source_async(
614 environment_or_template, template
615 )
616 return result
617 except TemplateNotFound:
618 continue
619 except Exception: # nosec B112
620 continue
621 raise TemplateNotFound(str(template))
623 async def list_templates_async(self) -> list[str]:
624 found: set[str] = set()
625 for loader in self.loaders:
626 templates = await loader.list_templates_async()
627 found.update(templates)
628 return sorted(found)
631class TemplatesSettings(TemplatesBaseSettings):
632 loader: str | None = None
633 extensions: list[str] = []
634 delimiters: dict[str, str] = {
635 "block_start_string": "[%",
636 "block_end_string": "%]",
637 "variable_start_string": "[[",
638 "variable_end_string": "]]",
639 "comment_start_string": "[#",
640 "comment_end_string": "#]",
641 }
642 globals: dict[str, t.Any] = {}
643 context_processors: list[str] = []
645 def __init__(self, **data: t.Any) -> None:
646 from pydantic import BaseModel
648 BaseModel.__init__(self, **data) # type: ignore[arg-type]
649 if not hasattr(self, "cache_timeout"):
650 self.cache_timeout = 300
651 try:
652 models = depends.get("models")
653 self.globals["models"] = models
654 except Exception:
655 self.globals["models"] = None
658class Templates(TemplatesBase):
659 app: AsyncJinja2Templates | None = None
661 def __init__(self, **kwargs: t.Any) -> None:
662 super().__init__(**kwargs)
663 self.filters: dict[str, t.Callable[..., t.Any]] = {}
664 self.enabled_admin = get_adapter("admin")
665 self.enabled_app = self._get_app_adapter()
666 self._admin = None
667 self._admin_initialized = False
669 def _get_app_adapter(self) -> t.Any:
670 app_adapter = get_adapter("app")
671 if app_adapter is not None:
672 return app_adapter
673 with suppress(Exception):
674 app_adapter = depends.get_sync("app")
675 if app_adapter is not None:
676 return app_adapter
678 return None
680 @property
681 def admin(self) -> AsyncJinja2Templates | None:
682 if not self._admin_initialized and self.enabled_admin:
683 import asyncio
685 loop = asyncio.get_event_loop()
686 if (
687 hasattr(self, "_admin_cache")
688 and hasattr(self, "admin_searchpaths")
689 and self.admin_searchpaths is not None
690 ):
691 debug("Initializing admin templates environment")
692 self._admin = loop.run_until_complete(
693 self.init_envs(
694 self.admin_searchpaths,
695 admin=True,
696 cache=self._admin_cache,
697 ),
698 )
699 else:
700 debug(
701 "Skipping admin templates initialization - missing cache or searchpaths"
702 )
703 self._admin_initialized = True
704 return self._admin
706 @admin.setter
707 def admin(self, value: AsyncJinja2Templates | None) -> None:
708 self._admin = value
709 self._admin_initialized = True
711 def get_loader(self, template_paths: list[AsyncPath]) -> ChoiceLoader:
712 searchpaths: list[AsyncPath] = []
713 for path in template_paths:
714 searchpaths.extend([path, path / "blocks"])
715 loaders: list[AsyncBaseLoader] = [
716 RedisLoader(searchpaths),
717 StorageLoader(searchpaths),
718 ]
719 file_loaders: list[AsyncBaseLoader | LoaderProtocol] = [
720 FileSystemLoader(searchpaths),
721 ]
722 jinja_loaders: list[AsyncBaseLoader | LoaderProtocol] = loaders + file_loaders
723 if not self.config.deployed and (not self.config.debug.production):
724 jinja_loaders = file_loaders + loaders
725 if self.enabled_admin and template_paths == self.admin_searchpaths:
726 jinja_loaders.append(
727 PackageLoader(self.enabled_admin.name, "templates", "admin"),
728 )
729 debug(jinja_loaders)
730 return ChoiceLoader(jinja_loaders)
732 async def init_envs(
733 self,
734 template_paths: list[AsyncPath],
735 admin: bool = False,
736 cache: t.Any | None = None,
737 ) -> AsyncJinja2Templates:
738 _extensions: list[t.Any] = [loopcontrols, i18n, jinja_debug]
739 _imported_extensions = [
740 import_module(e) for e in self.config.templates.extensions
741 ]
742 for e in _imported_extensions:
743 _extensions.extend(
744 [
745 v
746 for v in vars(e).values()
747 if isclass(v)
748 and v.__name__ != "Extension"
749 and issubclass(v, Extension)
750 ],
751 )
752 bytecode_cache = AsyncRedisBytecodeCache(prefix="bccache", client=cache)
753 context_processors: list[t.Callable[..., t.Any]] = []
754 for processor_path in self.config.templates.context_processors:
755 module_path, func_name = processor_path.rsplit(".", 1)
756 module = import_module(module_path)
757 processor = getattr(module, func_name)
758 context_processors.append(processor)
759 templates = AsyncJinja2Templates(
760 directory=AsyncPath("templates"),
761 context_processors=context_processors,
762 extensions=_extensions,
763 bytecode_cache=bytecode_cache,
764 enable_async=True,
765 )
766 loader = self.get_loader(template_paths)
767 if loader:
768 templates.env.loader = loader
769 elif self.config.templates.loader:
770 templates.env.loader = literal_eval(self.config.templates.loader)
771 for delimiter, value in self.config.templates.delimiters.items():
772 setattr(templates.env, delimiter, value)
773 # Type cast globals dict to avoid assignment type errors
774 globals_dict: dict[str, t.Any] = templates.env.globals # type: ignore[assignment]
775 globals_dict["config"] = self.config
776 globals_dict["render_block"] = templates.render_block
777 globals_dict["render_component"] = self._get_htmy_component_renderer()
778 if admin:
779 try:
780 from sqladmin.helpers import ( # type: ignore[import-not-found,import-untyped]
781 get_object_identifier,
782 )
783 except ImportError:
784 get_object_identifier = str
785 globals_dict["min"] = min
786 globals_dict["zip"] = zip
787 globals_dict["admin"] = self
788 globals_dict["is_list"] = lambda x: isinstance(x, list)
789 globals_dict["get_object_identifier"] = get_object_identifier
790 for k, v in self.config.templates.globals.items():
791 globals_dict[k] = v
792 return templates
794 def _resolve_cache(self, cache: t.Any | None) -> t.Any | None:
795 if cache is None:
796 try:
797 cache = depends.get("cache")
798 except Exception:
799 cache = None
800 return cache
802 async def _setup_admin_templates(self, cache: t.Any | None) -> None:
803 if self.enabled_admin:
804 self.admin_searchpaths = await self.get_searchpaths(self.enabled_admin)
805 self._admin_cache = cache
807 def _log_loader_info(self) -> None:
808 if self.app and self.app.env.loader and hasattr(self.app.env.loader, "loaders"):
809 for loader in self.app.env.loader.loaders:
810 self.logger.debug(f"{loader.__class__.__name__} initialized")
812 def _log_extension_info(self) -> None:
813 if self.app and hasattr(self.app.env, "extensions"):
814 for ext in self.app.env.extensions:
815 self.logger.debug(f"{ext.split('.')[-1]} loaded")
817 async def _clear_debug_cache(self, cache: t.Any | None) -> None:
818 if getattr(self.config.debug, "templates", False):
819 try:
820 for namespace in (
821 "templates",
822 "_templates",
823 "bccache",
824 "template",
825 "test",
826 ):
827 if cache is not None:
828 await cache.clear(namespace)
829 self.logger.debug("Template caches cleared")
830 with suppress(Exception):
831 htmy_adapter = await depends.get("htmy")
832 if htmy_adapter:
833 await htmy_adapter.clear_component_cache()
834 self.logger.debug("HTMY component caches cleared via adapter")
835 except (NotImplementedError, AttributeError) as e:
836 self.logger.debug(f"Cache clear not supported: {e}")
838 def _get_htmy_component_renderer(self) -> t.Callable[..., t.Any]:
839 async def render_component(
840 component_name: str,
841 context: dict[str, t.Any] | None = None,
842 **kwargs: t.Any,
843 ) -> str:
844 try:
845 htmy_adapter = await depends.get("htmy")
846 if htmy_adapter:
847 htmy_adapter.jinja_templates = self
849 response = await htmy_adapter.render_component(
850 request=None,
851 component=component_name,
852 context=context,
853 **kwargs,
854 )
855 return (
856 response.body.decode()
857 if hasattr(response.body, "decode")
858 else str(response.body)
859 )
860 else:
861 debug(
862 f"HTMY adapter not available for component '{component_name}'"
863 )
864 return f"<!-- HTMY adapter not available for '{component_name}' -->"
865 except Exception as e:
866 debug(
867 f"Failed to render component '{component_name}' via HTMY adapter: {e}"
868 )
869 return f"<!-- Error rendering component '{component_name}': {e} -->"
871 return render_component
873 async def init(self, cache: t.Any | None = None) -> None:
874 cache = self._resolve_cache(cache)
875 app_adapter = self.enabled_app
876 if app_adapter is None:
877 try:
878 app_adapter = depends.get("app")
879 debug("Retrieved app adapter from dependency injection")
880 except Exception:
881 try:
882 from ..app.default import App
884 app_adapter = depends.get("app") or App()
885 debug("Created app adapter by direct import")
886 depends.set("app", app_adapter)
887 except Exception:
888 from types import SimpleNamespace
890 app_adapter = SimpleNamespace(name="app", category="app")
891 debug(
892 "Created fallback app adapter - ACB discovery failed, direct import failed"
893 )
894 self.app_searchpaths = await self.get_searchpaths(app_adapter)
895 self.app = await self.init_envs(self.app_searchpaths, cache=cache)
896 depends.set("templates", self)
897 self._admin = None
898 self._admin_initialized = False
899 await self._setup_admin_templates(cache)
900 self._log_loader_info()
901 self._log_extension_info()
902 await self._clear_debug_cache(cache)
904 @staticmethod
905 def get_attr(html: str, attr: str) -> str | None:
906 parser = HTMLParser()
907 parser.feed(html)
908 soup = parser.get_starttag_text()
909 if soup is None:
910 return None
911 attr_pattern = _get_attr_pattern(attr)
912 _attr = f"{attr}="
913 for s in soup.split():
914 if attr_pattern.search(s):
915 return s.replace(_attr, "").strip('"')
916 return None
918 def _add_filters(self, env: t.Any) -> None:
919 if hasattr(self, "filters") and self.filters:
920 for name, filter_func in self.filters.items():
921 if hasattr(env, "add_filter"):
922 env.add_filter(filter_func, name)
923 else:
924 env.filters[name] = filter_func
926 @track_template_render
927 async def render_template(
928 self,
929 request: t.Any,
930 template: str,
931 context: dict[str, t.Any] | None = None,
932 status_code: int = 200,
933 headers: dict[str, str] | None = None,
934 ) -> t.Any:
935 if context is None:
936 context = {}
937 if headers is None:
938 headers = {}
940 templates_env = self.app
941 if templates_env:
942 return await templates_env.TemplateResponse(
943 request=request,
944 name=template,
945 context=context,
946 status_code=status_code,
947 headers=headers,
948 )
949 from starlette.responses import HTMLResponse
951 return HTMLResponse(
952 content=f"<html><body>Template {template} not found</body></html>",
953 status_code=404,
954 headers=headers,
955 )
957 @track_template_render
958 async def render_component(
959 self,
960 request: t.Any,
961 component: str,
962 context: dict[str, t.Any] | None = None,
963 status_code: int = 200,
964 headers: dict[str, str] | None = None,
965 **kwargs: t.Any,
966 ) -> t.Any:
967 try:
968 htmy_adapter = await depends.get("htmy")
969 if htmy_adapter:
970 htmy_adapter.jinja_templates = self
971 return await htmy_adapter.render_component(
972 request=request,
973 component=component,
974 context=context,
975 status_code=status_code,
976 headers=headers,
977 **kwargs,
978 )
979 else:
980 from starlette.responses import HTMLResponse
982 return HTMLResponse(
983 content=f"<html><body>HTMY adapter not available for component '{component}'</body></html>",
984 status_code=500,
985 headers=headers,
986 )
987 except Exception as e:
988 from starlette.responses import HTMLResponse
990 return HTMLResponse(
991 content=f"<html><body>Component error: {e}</body></html>",
992 status_code=500,
993 headers=headers,
994 )
996 def filter(
997 self,
998 name: str | None = None,
999 ) -> t.Callable[[t.Callable[..., t.Any]], t.Callable[..., t.Any]]:
1000 def decorator(f: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]:
1001 if self.app and hasattr(self.app.env, "filters"):
1002 self.app.env.filters[name or f.__name__] = f
1003 if self.admin and hasattr(self.admin.env, "filters"):
1004 self.admin.env.filters[name or f.__name__] = f
1005 return f
1007 return decorator
1009 def _load_extensions(self) -> list[t.Any]:
1010 _extensions: list[t.Any] = [loopcontrols, i18n, jinja_debug]
1011 extensions_list = getattr(
1012 getattr(self, "settings", None),
1013 "extensions",
1014 self.config.templates.extensions,
1015 )
1016 _imported_extensions = [import_module(e) for e in extensions_list]
1017 for e in _imported_extensions:
1018 _extensions.extend(
1019 [
1020 v
1021 for v in vars(e).values()
1022 if isclass(v)
1023 and v.__name__ != "Extension"
1024 and issubclass(v, Extension)
1025 ],
1026 )
1027 return _extensions
1030MODULE_ID = UUID("01937d86-4f2a-7b3c-8d9e-1234567890ab")
1031MODULE_STATUS = AdapterStatus.STABLE
1033with suppress(Exception):
1034 depends.set(Templates)
1036__all__ = ["Templates", "TemplatesSettings"]