Coverage for fastblocks / adapters / templates / jinja2.py: 53%

599 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:58 -0800

1"""Jinja2 Templates Adapter for FastBlocks. 

2 

3Provides asynchronous Jinja2 template rendering with advanced features including: 

4- Multi-layer loader system (Redis cache, cloud storage, filesystem) 

5- Bidirectional integration with HTMY components via dedicated HTMY adapter 

6- Fragment and partial template support 

7- Bytecode caching with Redis backend 

8- Template synchronization across cache/storage/filesystem layers 

9- Custom delimiters ([[/]] instead of {{/}}) 

10- Extensive template debugging and error handling 

11 

12Requirements: 

13- jinja2-async-environment>=0.14.3 

14- starlette-async-jinja>=1.12.4 

15- jinja2>=3.1.6 

16- redis>=3.5.3 (for caching) 

17 

18Usage: 

19```python 

20from acb.depends import Inject, depends 

21from acb.adapters import import_adapter 

22 

23templates = depends.get("templates") 

24 

25Templates = import_adapter("templates") 

26 

27response = await templates.render_template( 

28 request, "index.html", {"title": "FastBlocks"} 

29) 

30``` 

31 

32Author: lesleslie <les@wedgwoodwebworks.com> 

33Created: 2025-01-12 

34""" 

35 

36import asyncio 

37import re 

38import typing as t 

39from ast import literal_eval 

40from contextlib import suppress 

41from html.parser import HTMLParser 

42from importlib import import_module 

43from importlib.util import find_spec 

44from inspect import isclass 

45from pathlib import Path 

46from uuid import UUID 

47 

48from acb.adapters import AdapterStatus, get_adapter 

49from acb.config import Config 

50from acb.debug import debug 

51 

52# Import event tracking decorator (with fallback if unavailable) 

53try: 

54 from ._events_wrapper import track_template_render 

55except ImportError: 

56 # Fallback no-op decorator if events integration unavailable 

57 def track_template_render(func: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]: 

58 return func 

59 

60 

61from acb.depends import depends 

62from anyio import Path as AsyncPath 

63from jinja2 import TemplateNotFound 

64from jinja2.ext import Extension, i18n, loopcontrols 

65from jinja2.ext import debug as jinja_debug 

66from jinja2_async_environment.bccache import AsyncRedisBytecodeCache 

67from jinja2_async_environment.loaders import AsyncBaseLoader, SourceType 

68from starlette_async_jinja import AsyncJinja2Templates 

69from fastblocks.actions.sync.strategies import SyncDirection, SyncStrategy 

70from fastblocks.actions.sync.templates import sync_templates 

71 

72from ._base import TemplatesBase, TemplatesBaseSettings 

73 

74Cache, Storage, Models = None, None, None 

75 

76_TEMPLATE_REPLACEMENTS = [ 

77 (b"{{", b"[["), 

78 (b"}}", b"]]"), 

79 (b"{%", b"[%"), 

80 (b"%}", b"%]"), 

81] 

82_HTTP_TO_HTTPS = (b"http://", b"https://") 

83 

84_ATTR_PATTERN_CACHE: dict[str, re.Pattern[str]] = {} 

85 

86 

87def _get_attr_pattern(attr: str) -> re.Pattern[str]: 

88 if attr not in _ATTR_PATTERN_CACHE: 

89 escaped_attr = re.escape(f"{attr}=") 

90 _ATTR_PATTERN_CACHE[attr] = re.compile( 

91 escaped_attr 

92 ) # REGEX OK: Template attribute pattern compilation for Jinja2 

93 return _ATTR_PATTERN_CACHE[attr] 

94 

95 

96def _apply_template_replacements(source: bytes, deployed: bool = False) -> bytes: 

97 for old_pattern, new_pattern in _TEMPLATE_REPLACEMENTS: 

98 source = source.replace(old_pattern, new_pattern) 

99 if deployed: 

100 source = source.replace(*_HTTP_TO_HTTPS) 

101 

102 return source 

103 

104 

105class BaseTemplateLoader(AsyncBaseLoader): 

106 config: t.Any = None 

107 cache: t.Any = None 

108 storage: t.Any = None 

109 

110 def __init__( 

111 self, 

112 searchpath: AsyncPath | t.Sequence[AsyncPath] | None = None, 

113 ) -> None: 

114 super().__init__(searchpath or []) 

115 if self.storage is None: 

116 try: 

117 self.storage = depends.get("storage") 

118 except Exception: 

119 self.storage = get_adapter("storage") 

120 if self.cache is None: 

121 try: 

122 self.cache = depends.get("cache") 

123 except Exception: 

124 self.cache = get_adapter("cache") 

125 if not hasattr(self, "config"): 

126 try: 

127 self.config = depends.get("config") 

128 except Exception: 

129 config_adapter = get_adapter("config") 

130 self.config = ( 

131 config_adapter if isinstance(config_adapter, Config) else Config() 

132 ) 

133 

134 def get_supported_extensions(self) -> tuple[str, ...]: 

135 return ("html", "css", "js") 

136 

137 async def _list_templates_for_extensions( 

138 self, 

139 extensions: tuple[str, ...], 

140 ) -> list[str]: 

141 found: set[str] = set() 

142 for searchpath in self.searchpath: 

143 for ext in extensions: 

144 async for p in searchpath.rglob(f"*.{ext}"): 

145 found.add(str(p)) 

146 return sorted(found) 

147 

148 def _normalize_template( 

149 self, 

150 environment_or_template: t.Any, 

151 template: str | AsyncPath | None = None, 

152 ) -> str | AsyncPath: 

153 if template is None: 

154 template = environment_or_template 

155 assert template is not None 

156 return template 

157 

158 async def _find_template_path_parallel( 

159 self, 

160 template: str | AsyncPath, 

161 ) -> AsyncPath | None: 

162 async def check_path(searchpath: AsyncPath) -> AsyncPath | None: 

163 path = searchpath / template 

164 if await path.is_file(): 

165 return path 

166 return None 

167 

168 tasks = [check_path(searchpath) for searchpath in self.searchpath] 

169 results = await asyncio.gather(*tasks, return_exceptions=True) 

170 

171 for result in results: 

172 if isinstance(result, AsyncPath): 

173 return result 

174 return None 

175 

176 async def _find_storage_path_parallel( 

177 self, 

178 template: str | AsyncPath, 

179 ) -> tuple[AsyncPath, AsyncPath] | None: 

180 async def check_storage_path( 

181 searchpath: AsyncPath, 

182 ) -> tuple[AsyncPath, AsyncPath] | None: 

183 path = searchpath / template 

184 storage_path = Templates.get_storage_path(path) 

185 if storage_path and await self.storage.templates.exists(storage_path): 

186 return path, storage_path 

187 return None 

188 

189 tasks = [check_storage_path(searchpath) for searchpath in self.searchpath] 

190 results = await asyncio.gather(*tasks, return_exceptions=True) 

191 

192 for result in results: 

193 if isinstance(result, tuple): 

194 return result 

195 return None 

196 

197 async def _find_cache_path_parallel( 

198 self, 

199 template: str | AsyncPath, 

200 ) -> tuple[AsyncPath, AsyncPath, str] | None: 

201 async def check_cache_path( 

202 searchpath: AsyncPath, 

203 ) -> tuple[AsyncPath, AsyncPath, str] | None: 

204 path = searchpath / template if searchpath else AsyncPath(template) 

205 storage_path = Templates.get_storage_path(path) 

206 cache_key = Templates.get_cache_key(storage_path) 

207 if ( 

208 storage_path 

209 and self.cache is not None 

210 and await self.cache.exists(cache_key) 

211 ): 

212 return path, storage_path, cache_key 

213 return None 

214 

215 tasks = [check_cache_path(searchpath) for searchpath in self.searchpath] 

216 results = await asyncio.gather(*tasks, return_exceptions=True) 

217 

218 for result in results: 

219 if isinstance(result, tuple): 

220 return result 

221 return None 

222 

223 

224class LoaderProtocol(t.Protocol): 

225 cache: t.Any 

226 config: t.Any 

227 storage: t.Any 

228 

229 async def get_source_async( 

230 self, 

231 environment_or_template: t.Any, 

232 template: str | AsyncPath | None = None, 

233 ) -> tuple[ 

234 str, 

235 str | None, 

236 t.Callable[[], bool] | t.Callable[[], t.Awaitable[bool]], 

237 ]: ... 

238 

239 async def list_templates_async(self) -> list[str]: ... 

240 

241 

242class FileSystemLoader(BaseTemplateLoader): 

243 async def _check_storage_exists(self, storage_path: AsyncPath) -> bool: 

244 if self.storage is not None: 

245 return t.cast(bool, await self.storage.templates.exists(storage_path)) 

246 return False 

247 

248 async def _sync_template_file( 

249 self, 

250 path: AsyncPath, 

251 storage_path: AsyncPath, 

252 ) -> tuple[bytes, int]: 

253 if sync_templates is None or SyncDirection is None or SyncStrategy is None: 

254 return await self._sync_from_storage_fallback(path, storage_path) 

255 

256 try: 

257 strategy = SyncStrategy( 

258 backup_on_conflict=False, 

259 ) 

260 

261 template_paths = [path] 

262 result = await sync_templates( 

263 template_paths=template_paths, 

264 strategy=strategy, 

265 ) 

266 

267 resp = await path.read_bytes() 

268 local_stat = await path.stat() 

269 local_mtime = int(local_stat.st_mtime) 

270 

271 debug(f"Template sync result: {result.sync_status} for {path}") 

272 return resp, local_mtime 

273 

274 except Exception as e: 

275 debug(f"Sync action failed for {path}: {e}, falling back to primitive sync") 

276 return await self._sync_from_storage_fallback(path, storage_path) 

277 

278 async def _sync_from_storage_fallback( 

279 self, 

280 path: AsyncPath, 

281 storage_path: AsyncPath, 

282 ) -> tuple[bytes, int]: 

283 local_stat = await path.stat() 

284 local_mtime = int(local_stat.st_mtime) 

285 local_size = local_stat.st_size 

286 storage_stat = await self.storage.templates.stat(storage_path) 

287 storage_mtime = round(storage_stat.get("mtime")) 

288 storage_size = storage_stat.get("size") 

289 

290 if local_mtime < storage_mtime and local_size != storage_size: 

291 resp = await self.storage.templates.open(storage_path) 

292 await path.write_bytes(resp) 

293 else: 

294 resp = await path.read_bytes() 

295 if local_size != storage_size: 

296 await self.storage.templates.write(storage_path, resp) 

297 return resp, local_mtime 

298 

299 async def _read_and_store_template( 

300 self, 

301 path: AsyncPath, 

302 storage_path: AsyncPath, 

303 ) -> bytes: 

304 try: 

305 resp = await path.read_bytes() 

306 if self.storage is not None: 

307 try: 

308 import asyncio 

309 

310 await asyncio.wait_for( 

311 self.storage.templates.write(storage_path, resp), timeout=5.0 

312 ) 

313 except (TimeoutError, Exception) as e: 

314 debug( 

315 f"Storage write failed for {storage_path}: {e}, continuing with local file" 

316 ) 

317 return resp 

318 except FileNotFoundError: 

319 raise TemplateNotFound(path.name) 

320 

321 async def _cache_template(self, storage_path: AsyncPath, resp: bytes) -> None: 

322 if self.cache is not None: 

323 await self.cache.set(Templates.get_cache_key(storage_path), resp) 

324 

325 async def get_source_async( 

326 self, 

327 environment_or_template: t.Any, 

328 template: str | AsyncPath | None = None, 

329 ) -> SourceType: 

330 template = self._normalize_template(environment_or_template, template) 

331 path = await self._find_template_path_parallel(template) 

332 if path is None: 

333 raise TemplateNotFound(str(template)) 

334 storage_path = Templates.get_storage_path(path) 

335 debug(path) 

336 

337 fs_exists = await path.exists() 

338 storage_exists = await self._check_storage_exists(storage_path) 

339 local_mtime = 0 

340 

341 if storage_exists and fs_exists and (not self.config.deployed): 

342 resp, local_mtime = await self._sync_template_file(path, storage_path) 

343 else: 

344 resp = await self._read_and_store_template(path, storage_path) 

345 

346 await self._cache_template(storage_path, resp) 

347 

348 async def uptodate() -> bool: 

349 return int((await path.stat()).st_mtime) == local_mtime 

350 

351 return (resp.decode(), str(storage_path), uptodate) 

352 

353 async def list_templates_async(self) -> list[str]: 

354 return await self._list_templates_for_extensions( 

355 self.get_supported_extensions(), 

356 ) 

357 

358 

359class StorageLoader(BaseTemplateLoader): 

360 async def _check_filesystem_sync_opportunity( 

361 self, template: str | AsyncPath, storage_path: AsyncPath 

362 ) -> AsyncPath | None: 

363 if not self.config or self.config.deployed: 

364 return None 

365 

366 fs_result = await self._find_template_path_parallel(template) 

367 if fs_result and await fs_result.exists(): 

368 return fs_result 

369 return None 

370 

371 async def _sync_storage_with_filesystem( 

372 self, 

373 fs_path: AsyncPath, 

374 storage_path: AsyncPath, 

375 ) -> tuple[bytes, int]: 

376 if sync_templates is None or SyncDirection is None or SyncStrategy is None: 

377 resp = await self.storage.templates.open(storage_path) 

378 stat = await self.storage.templates.stat(storage_path) 

379 return resp, round(stat.get("mtime").timestamp()) 

380 

381 try: 

382 strategy = SyncStrategy( 

383 direction=SyncDirection.PULL, 

384 backup_on_conflict=False, 

385 ) 

386 

387 result = await sync_templates( 

388 template_paths=[fs_path], 

389 strategy=strategy, 

390 ) 

391 

392 resp = await fs_path.read_bytes() 

393 local_stat = await fs_path.stat() 

394 local_mtime = int(local_stat.st_mtime) 

395 

396 debug( 

397 f"Storage-filesystem sync result: {result.sync_status} for {storage_path}" 

398 ) 

399 return resp, local_mtime 

400 

401 except Exception as e: 

402 debug( 

403 f"Storage sync failed for {storage_path}: {e}, reading from storage only" 

404 ) 

405 resp = await self.storage.templates.open(storage_path) 

406 stat = await self.storage.templates.stat(storage_path) 

407 return resp, round(stat.get("mtime").timestamp()) 

408 

409 async def get_source_async( 

410 self, 

411 environment_or_template: t.Any, 

412 template: str | AsyncPath | None = None, 

413 ) -> tuple[str, str, t.Callable[[], t.Awaitable[bool]]]: 

414 template = self._normalize_template(environment_or_template, template) 

415 result = await self._find_storage_path_parallel(template) 

416 if result is None: 

417 raise TemplateNotFound(str(template)) 

418 _, storage_path = result 

419 debug(storage_path) 

420 

421 try: 

422 fs_path = await self._check_filesystem_sync_opportunity( 

423 template, storage_path 

424 ) 

425 

426 if fs_path: 

427 resp, local_mtime = await self._sync_storage_with_filesystem( 

428 fs_path, storage_path 

429 ) 

430 else: 

431 resp = await self.storage.templates.open(storage_path) 

432 local_stat = await self.storage.templates.stat(storage_path) 

433 local_mtime = round(local_stat.get("mtime").timestamp()) 

434 

435 if self.cache is not None: 

436 await self.cache.set(Templates.get_cache_key(storage_path), resp) 

437 

438 async def uptodate() -> bool: 

439 if fs_path and await fs_path.exists(): 

440 fs_stat = await fs_path.stat() 

441 return int(fs_stat.st_mtime) == local_mtime 

442 else: 

443 storage_stat = await self.storage.templates.stat(storage_path) 

444 mtime = storage_stat.get("mtime") 

445 if hasattr(mtime, "timestamp"): 

446 timestamp = t.cast(float, mtime.timestamp()) 

447 return round(timestamp) == local_mtime 

448 return False 

449 

450 return (resp.decode(), str(storage_path), uptodate) 

451 except (FileNotFoundError, AttributeError): 

452 raise TemplateNotFound(str(template)) 

453 

454 async def list_templates_async(self) -> list[str]: 

455 found: list[str] = [] 

456 for searchpath in self.searchpath: 

457 with suppress(FileNotFoundError): 

458 paths = await self.storage.templates.list( 

459 Templates.get_storage_path(searchpath), 

460 ) 

461 found.extend(p for p in paths if p.endswith((".html", ".css", ".js"))) 

462 found.sort() 

463 return found 

464 

465 

466class RedisLoader(BaseTemplateLoader): 

467 async def get_source_async( 

468 self, 

469 environment_or_template: t.Any, 

470 template: str | AsyncPath | None = None, 

471 ) -> tuple[str, str | None, t.Callable[[], t.Awaitable[bool]]]: 

472 template = self._normalize_template(environment_or_template, template) 

473 result = await self._find_cache_path_parallel(template) 

474 if result is None: 

475 raise TemplateNotFound(str(template)) 

476 path, _, cache_key = result 

477 debug(cache_key) 

478 resp = await self.cache.get(cache_key) if self.cache is not None else None 

479 if not resp: 

480 raise TemplateNotFound(path.name) 

481 

482 async def uptodate() -> bool: 

483 return True 

484 

485 return (resp.decode(), None, uptodate) 

486 

487 async def list_templates_async(self) -> list[str]: 

488 found: list[str] = [] 

489 for ext in ("html", "css", "js"): 

490 scan_result: t.Any = ( 

491 await self.cache.scan(f"*.{ext}") if self.cache is not None else [] 

492 ) 

493 if hasattr(scan_result, "__aiter__"): 

494 async for k in scan_result: 

495 found.append(k) 

496 else: 

497 found.extend(scan_result) 

498 found.sort() 

499 return found 

500 

501 

502class PackageLoader(BaseTemplateLoader): 

503 _template_root: AsyncPath 

504 _adapter: str 

505 package_name: str 

506 _loader: t.Any 

507 

508 def __init__( 

509 self, 

510 package_name: str, 

511 path: str = "templates", 

512 adapter: str = "admin", 

513 ) -> None: 

514 self.package_path = Path(package_name) 

515 self.path = self.package_path / path 

516 super().__init__(AsyncPath(self.path)) 

517 self.package_name = package_name 

518 self._adapter = adapter 

519 self._template_root = AsyncPath(".") 

520 try: 

521 if package_name.startswith("/"): 

522 spec = None 

523 self._loader = None 

524 return 

525 import_module(package_name) 

526 spec = find_spec(package_name) 

527 if spec is None: 

528 msg = f"Could not find package {package_name}" 

529 raise ImportError(msg) 

530 except ModuleNotFoundError: 

531 spec = None 

532 self._loader = None 

533 return 

534 roots: list[Path] = [] 

535 template_root = None 

536 loader = spec.loader 

537 self._loader = loader 

538 if spec.submodule_search_locations: 

539 roots.extend(Path(s) for s in spec.submodule_search_locations) 

540 elif spec.origin is not None: 

541 roots.append(Path(spec.origin)) 

542 for root in roots: 

543 root = root / path 

544 if root.is_dir(): 

545 template_root = root 

546 break 

547 if template_root is None: 

548 msg = f"The {package_name!r} package was not installed in a way that PackageLoader understands." 

549 raise ValueError( 

550 msg, 

551 ) 

552 self._template_root = AsyncPath(template_root) 

553 

554 async def get_source_async( 

555 self, 

556 environment_or_template: t.Any, 

557 template: str | AsyncPath | None = None, 

558 ) -> tuple[str, str, t.Callable[[], t.Awaitable[bool]]]: 

559 if template is None: 

560 template = environment_or_template 

561 assert template is not None 

562 template_path: AsyncPath = AsyncPath(template) 

563 path = self._template_root / template_path 

564 debug(path) 

565 if not await path.is_file(): 

566 raise TemplateNotFound(template_path.name) 

567 source = await path.read_bytes() 

568 mtime = (await path.stat()).st_mtime 

569 

570 async def uptodate() -> bool: 

571 return await path.is_file() and (await path.stat()).st_mtime == mtime 

572 

573 source = _apply_template_replacements(source, self.config.deployed) 

574 storage_path = Templates.get_storage_path(path) 

575 _storage_path: list[str] = list(storage_path.parts) 

576 _storage_path[0] = "_templates" 

577 _storage_path.insert(1, self._adapter) 

578 _storage_path.insert(2, getattr(self.config, self._adapter).style) 

579 storage_path = AsyncPath("/".join(_storage_path)) 

580 cache_key = Templates.get_cache_key(storage_path) 

581 if self.cache is not None: 

582 await self.cache.set(cache_key, source) 

583 return (source.decode(), path.name, uptodate) 

584 

585 async def list_templates_async(self) -> list[str]: 

586 found: set[str] = set() 

587 for ext in ("html", "css", "js"): 

588 found.update([str(p) async for p in self._template_root.rglob(f"*.{ext}")]) 

589 return sorted(found) 

590 

591 

592class ChoiceLoader(AsyncBaseLoader): 

593 loaders: list[AsyncBaseLoader | LoaderProtocol] 

594 

595 def __init__( 

596 self, 

597 loaders: list[AsyncBaseLoader | LoaderProtocol], 

598 searchpath: AsyncPath | t.Sequence[AsyncPath] | None = None, 

599 ) -> None: 

600 super().__init__(searchpath or AsyncPath("templates")) 

601 self.loaders = loaders 

602 

603 async def get_source_async( 

604 self, 

605 environment_or_template: t.Any, 

606 template: str | AsyncPath | None = None, 

607 ) -> SourceType: 

608 if template is None: 

609 template = environment_or_template 

610 assert template is not None 

611 for loader in self.loaders: 

612 try: 

613 result = await loader.get_source_async( 

614 environment_or_template, template 

615 ) 

616 return result 

617 except TemplateNotFound: 

618 continue 

619 except Exception: # nosec B112 

620 continue 

621 raise TemplateNotFound(str(template)) 

622 

623 async def list_templates_async(self) -> list[str]: 

624 found: set[str] = set() 

625 for loader in self.loaders: 

626 templates = await loader.list_templates_async() 

627 found.update(templates) 

628 return sorted(found) 

629 

630 

631class TemplatesSettings(TemplatesBaseSettings): 

632 loader: str | None = None 

633 extensions: list[str] = [] 

634 delimiters: dict[str, str] = { 

635 "block_start_string": "[%", 

636 "block_end_string": "%]", 

637 "variable_start_string": "[[", 

638 "variable_end_string": "]]", 

639 "comment_start_string": "[#", 

640 "comment_end_string": "#]", 

641 } 

642 globals: dict[str, t.Any] = {} 

643 context_processors: list[str] = [] 

644 

645 def __init__(self, **data: t.Any) -> None: 

646 from pydantic import BaseModel 

647 

648 BaseModel.__init__(self, **data) # type: ignore[arg-type] 

649 if not hasattr(self, "cache_timeout"): 

650 self.cache_timeout = 300 

651 try: 

652 models = depends.get("models") 

653 self.globals["models"] = models 

654 except Exception: 

655 self.globals["models"] = None 

656 

657 

658class Templates(TemplatesBase): 

659 app: AsyncJinja2Templates | None = None 

660 

661 def __init__(self, **kwargs: t.Any) -> None: 

662 super().__init__(**kwargs) 

663 self.filters: dict[str, t.Callable[..., t.Any]] = {} 

664 self.enabled_admin = get_adapter("admin") 

665 self.enabled_app = self._get_app_adapter() 

666 self._admin = None 

667 self._admin_initialized = False 

668 

669 def _get_app_adapter(self) -> t.Any: 

670 app_adapter = get_adapter("app") 

671 if app_adapter is not None: 

672 return app_adapter 

673 with suppress(Exception): 

674 app_adapter = depends.get_sync("app") 

675 if app_adapter is not None: 

676 return app_adapter 

677 

678 return None 

679 

680 @property 

681 def admin(self) -> AsyncJinja2Templates | None: 

682 if not self._admin_initialized and self.enabled_admin: 

683 import asyncio 

684 

685 loop = asyncio.get_event_loop() 

686 if ( 

687 hasattr(self, "_admin_cache") 

688 and hasattr(self, "admin_searchpaths") 

689 and self.admin_searchpaths is not None 

690 ): 

691 debug("Initializing admin templates environment") 

692 self._admin = loop.run_until_complete( 

693 self.init_envs( 

694 self.admin_searchpaths, 

695 admin=True, 

696 cache=self._admin_cache, 

697 ), 

698 ) 

699 else: 

700 debug( 

701 "Skipping admin templates initialization - missing cache or searchpaths" 

702 ) 

703 self._admin_initialized = True 

704 return self._admin 

705 

706 @admin.setter 

707 def admin(self, value: AsyncJinja2Templates | None) -> None: 

708 self._admin = value 

709 self._admin_initialized = True 

710 

711 def get_loader(self, template_paths: list[AsyncPath]) -> ChoiceLoader: 

712 searchpaths: list[AsyncPath] = [] 

713 for path in template_paths: 

714 searchpaths.extend([path, path / "blocks"]) 

715 loaders: list[AsyncBaseLoader] = [ 

716 RedisLoader(searchpaths), 

717 StorageLoader(searchpaths), 

718 ] 

719 file_loaders: list[AsyncBaseLoader | LoaderProtocol] = [ 

720 FileSystemLoader(searchpaths), 

721 ] 

722 jinja_loaders: list[AsyncBaseLoader | LoaderProtocol] = loaders + file_loaders 

723 if not self.config.deployed and (not self.config.debug.production): 

724 jinja_loaders = file_loaders + loaders 

725 if self.enabled_admin and template_paths == self.admin_searchpaths: 

726 jinja_loaders.append( 

727 PackageLoader(self.enabled_admin.name, "templates", "admin"), 

728 ) 

729 debug(jinja_loaders) 

730 return ChoiceLoader(jinja_loaders) 

731 

732 async def init_envs( 

733 self, 

734 template_paths: list[AsyncPath], 

735 admin: bool = False, 

736 cache: t.Any | None = None, 

737 ) -> AsyncJinja2Templates: 

738 _extensions: list[t.Any] = [loopcontrols, i18n, jinja_debug] 

739 _imported_extensions = [ 

740 import_module(e) for e in self.config.templates.extensions 

741 ] 

742 for e in _imported_extensions: 

743 _extensions.extend( 

744 [ 

745 v 

746 for v in vars(e).values() 

747 if isclass(v) 

748 and v.__name__ != "Extension" 

749 and issubclass(v, Extension) 

750 ], 

751 ) 

752 bytecode_cache = AsyncRedisBytecodeCache(prefix="bccache", client=cache) 

753 context_processors: list[t.Callable[..., t.Any]] = [] 

754 for processor_path in self.config.templates.context_processors: 

755 module_path, func_name = processor_path.rsplit(".", 1) 

756 module = import_module(module_path) 

757 processor = getattr(module, func_name) 

758 context_processors.append(processor) 

759 templates = AsyncJinja2Templates( 

760 directory=AsyncPath("templates"), 

761 context_processors=context_processors, 

762 extensions=_extensions, 

763 bytecode_cache=bytecode_cache, 

764 enable_async=True, 

765 ) 

766 loader = self.get_loader(template_paths) 

767 if loader: 

768 templates.env.loader = loader 

769 elif self.config.templates.loader: 

770 templates.env.loader = literal_eval(self.config.templates.loader) 

771 for delimiter, value in self.config.templates.delimiters.items(): 

772 setattr(templates.env, delimiter, value) 

773 # Type cast globals dict to avoid assignment type errors 

774 globals_dict: dict[str, t.Any] = templates.env.globals # type: ignore[assignment] 

775 globals_dict["config"] = self.config 

776 globals_dict["render_block"] = templates.render_block 

777 globals_dict["render_component"] = self._get_htmy_component_renderer() 

778 if admin: 

779 try: 

780 from sqladmin.helpers import ( # type: ignore[import-not-found,import-untyped] 

781 get_object_identifier, 

782 ) 

783 except ImportError: 

784 get_object_identifier = str 

785 globals_dict["min"] = min 

786 globals_dict["zip"] = zip 

787 globals_dict["admin"] = self 

788 globals_dict["is_list"] = lambda x: isinstance(x, list) 

789 globals_dict["get_object_identifier"] = get_object_identifier 

790 for k, v in self.config.templates.globals.items(): 

791 globals_dict[k] = v 

792 return templates 

793 

794 def _resolve_cache(self, cache: t.Any | None) -> t.Any | None: 

795 if cache is None: 

796 try: 

797 cache = depends.get("cache") 

798 except Exception: 

799 cache = None 

800 return cache 

801 

802 async def _setup_admin_templates(self, cache: t.Any | None) -> None: 

803 if self.enabled_admin: 

804 self.admin_searchpaths = await self.get_searchpaths(self.enabled_admin) 

805 self._admin_cache = cache 

806 

807 def _log_loader_info(self) -> None: 

808 if self.app and self.app.env.loader and hasattr(self.app.env.loader, "loaders"): 

809 for loader in self.app.env.loader.loaders: 

810 self.logger.debug(f"{loader.__class__.__name__} initialized") 

811 

812 def _log_extension_info(self) -> None: 

813 if self.app and hasattr(self.app.env, "extensions"): 

814 for ext in self.app.env.extensions: 

815 self.logger.debug(f"{ext.split('.')[-1]} loaded") 

816 

817 async def _clear_debug_cache(self, cache: t.Any | None) -> None: 

818 if getattr(self.config.debug, "templates", False): 

819 try: 

820 for namespace in ( 

821 "templates", 

822 "_templates", 

823 "bccache", 

824 "template", 

825 "test", 

826 ): 

827 if cache is not None: 

828 await cache.clear(namespace) 

829 self.logger.debug("Template caches cleared") 

830 with suppress(Exception): 

831 htmy_adapter = await depends.get("htmy") 

832 if htmy_adapter: 

833 await htmy_adapter.clear_component_cache() 

834 self.logger.debug("HTMY component caches cleared via adapter") 

835 except (NotImplementedError, AttributeError) as e: 

836 self.logger.debug(f"Cache clear not supported: {e}") 

837 

838 def _get_htmy_component_renderer(self) -> t.Callable[..., t.Any]: 

839 async def render_component( 

840 component_name: str, 

841 context: dict[str, t.Any] | None = None, 

842 **kwargs: t.Any, 

843 ) -> str: 

844 try: 

845 htmy_adapter = await depends.get("htmy") 

846 if htmy_adapter: 

847 htmy_adapter.jinja_templates = self 

848 

849 response = await htmy_adapter.render_component( 

850 request=None, 

851 component=component_name, 

852 context=context, 

853 **kwargs, 

854 ) 

855 return ( 

856 response.body.decode() 

857 if hasattr(response.body, "decode") 

858 else str(response.body) 

859 ) 

860 else: 

861 debug( 

862 f"HTMY adapter not available for component '{component_name}'" 

863 ) 

864 return f"<!-- HTMY adapter not available for '{component_name}' -->" 

865 except Exception as e: 

866 debug( 

867 f"Failed to render component '{component_name}' via HTMY adapter: {e}" 

868 ) 

869 return f"<!-- Error rendering component '{component_name}': {e} -->" 

870 

871 return render_component 

872 

873 async def init(self, cache: t.Any | None = None) -> None: 

874 cache = self._resolve_cache(cache) 

875 app_adapter = self.enabled_app 

876 if app_adapter is None: 

877 try: 

878 app_adapter = depends.get("app") 

879 debug("Retrieved app adapter from dependency injection") 

880 except Exception: 

881 try: 

882 from ..app.default import App 

883 

884 app_adapter = depends.get("app") or App() 

885 debug("Created app adapter by direct import") 

886 depends.set("app", app_adapter) 

887 except Exception: 

888 from types import SimpleNamespace 

889 

890 app_adapter = SimpleNamespace(name="app", category="app") 

891 debug( 

892 "Created fallback app adapter - ACB discovery failed, direct import failed" 

893 ) 

894 self.app_searchpaths = await self.get_searchpaths(app_adapter) 

895 self.app = await self.init_envs(self.app_searchpaths, cache=cache) 

896 depends.set("templates", self) 

897 self._admin = None 

898 self._admin_initialized = False 

899 await self._setup_admin_templates(cache) 

900 self._log_loader_info() 

901 self._log_extension_info() 

902 await self._clear_debug_cache(cache) 

903 

904 @staticmethod 

905 def get_attr(html: str, attr: str) -> str | None: 

906 parser = HTMLParser() 

907 parser.feed(html) 

908 soup = parser.get_starttag_text() 

909 if soup is None: 

910 return None 

911 attr_pattern = _get_attr_pattern(attr) 

912 _attr = f"{attr}=" 

913 for s in soup.split(): 

914 if attr_pattern.search(s): 

915 return s.replace(_attr, "").strip('"') 

916 return None 

917 

918 def _add_filters(self, env: t.Any) -> None: 

919 if hasattr(self, "filters") and self.filters: 

920 for name, filter_func in self.filters.items(): 

921 if hasattr(env, "add_filter"): 

922 env.add_filter(filter_func, name) 

923 else: 

924 env.filters[name] = filter_func 

925 

926 @track_template_render 

927 async def render_template( 

928 self, 

929 request: t.Any, 

930 template: str, 

931 context: dict[str, t.Any] | None = None, 

932 status_code: int = 200, 

933 headers: dict[str, str] | None = None, 

934 ) -> t.Any: 

935 if context is None: 

936 context = {} 

937 if headers is None: 

938 headers = {} 

939 

940 templates_env = self.app 

941 if templates_env: 

942 return await templates_env.TemplateResponse( 

943 request=request, 

944 name=template, 

945 context=context, 

946 status_code=status_code, 

947 headers=headers, 

948 ) 

949 from starlette.responses import HTMLResponse 

950 

951 return HTMLResponse( 

952 content=f"<html><body>Template {template} not found</body></html>", 

953 status_code=404, 

954 headers=headers, 

955 ) 

956 

957 @track_template_render 

958 async def render_component( 

959 self, 

960 request: t.Any, 

961 component: str, 

962 context: dict[str, t.Any] | None = None, 

963 status_code: int = 200, 

964 headers: dict[str, str] | None = None, 

965 **kwargs: t.Any, 

966 ) -> t.Any: 

967 try: 

968 htmy_adapter = await depends.get("htmy") 

969 if htmy_adapter: 

970 htmy_adapter.jinja_templates = self 

971 return await htmy_adapter.render_component( 

972 request=request, 

973 component=component, 

974 context=context, 

975 status_code=status_code, 

976 headers=headers, 

977 **kwargs, 

978 ) 

979 else: 

980 from starlette.responses import HTMLResponse 

981 

982 return HTMLResponse( 

983 content=f"<html><body>HTMY adapter not available for component '{component}'</body></html>", 

984 status_code=500, 

985 headers=headers, 

986 ) 

987 except Exception as e: 

988 from starlette.responses import HTMLResponse 

989 

990 return HTMLResponse( 

991 content=f"<html><body>Component error: {e}</body></html>", 

992 status_code=500, 

993 headers=headers, 

994 ) 

995 

996 def filter( 

997 self, 

998 name: str | None = None, 

999 ) -> t.Callable[[t.Callable[..., t.Any]], t.Callable[..., t.Any]]: 

1000 def decorator(f: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]: 

1001 if self.app and hasattr(self.app.env, "filters"): 

1002 self.app.env.filters[name or f.__name__] = f 

1003 if self.admin and hasattr(self.admin.env, "filters"): 

1004 self.admin.env.filters[name or f.__name__] = f 

1005 return f 

1006 

1007 return decorator 

1008 

1009 def _load_extensions(self) -> list[t.Any]: 

1010 _extensions: list[t.Any] = [loopcontrols, i18n, jinja_debug] 

1011 extensions_list = getattr( 

1012 getattr(self, "settings", None), 

1013 "extensions", 

1014 self.config.templates.extensions, 

1015 ) 

1016 _imported_extensions = [import_module(e) for e in extensions_list] 

1017 for e in _imported_extensions: 

1018 _extensions.extend( 

1019 [ 

1020 v 

1021 for v in vars(e).values() 

1022 if isclass(v) 

1023 and v.__name__ != "Extension" 

1024 and issubclass(v, Extension) 

1025 ], 

1026 ) 

1027 return _extensions 

1028 

1029 

1030MODULE_ID = UUID("01937d86-4f2a-7b3c-8d9e-1234567890ab") 

1031MODULE_STATUS = AdapterStatus.STABLE 

1032 

1033with suppress(Exception): 

1034 depends.set(Templates) 

1035 

1036__all__ = ["Templates", "TemplatesSettings"]