Coverage for fastblocks / caching.py: 69%

432 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:58 -0800

1import asyncio 

2import base64 

3import email.utils 

4import re 

5import sys 

6import time 

7import typing as t 

8from collections.abc import Iterable, Sequence 

9from contextlib import suppress 

10from dataclasses import dataclass 

11from functools import partial 

12from urllib.request import parse_http_list 

13 

14from acb.actions.hash import hash 

15from starlette.datastructures import URL, Headers, MutableHeaders 

16from starlette.requests import Request 

17from starlette.responses import Response 

18 

19HashFunc = t.Callable[[t.Any], str] 

20GetAdapterFunc = t.Callable[[str], t.Any] 

21ImportAdapterFunc = t.Callable[[str | list[str] | None], t.Any] 

22from acb.adapters import get_adapter 

23from acb.depends import depends 

24from starlette.types import ASGIApp, Message, Receive, Scope, Send 

25 

26from .exceptions import RequestNotCachable, ResponseNotCachable 

27 

28 

29def _safe_log(logger: t.Any, level: str, message: str) -> None: 

30 return CacheUtils.safe_log(logger, level, message) 

31 

32 

33_CacheClass = None 

34 

35_str_encode = str.encode 

36_base64_encodebytes = base64.encodebytes 

37_base64_decodebytes = base64.decodebytes 

38 

39 

40def get_cache() -> t.Any: 

41 global _CacheClass 

42 if _CacheClass is None: 

43 _CacheClass = get_adapter("cache") 

44 return _CacheClass 

45 

46 

47class CacheUtils: 

48 GET = sys.intern("GET") 

49 HEAD = sys.intern("HEAD") 

50 POST = sys.intern("POST") 

51 PUT = sys.intern("PUT") 

52 PATCH = sys.intern("PATCH") 

53 DELETE = sys.intern("DELETE") 

54 CACHE_CONTROL = sys.intern("Cache-Control") 

55 ETAG = sys.intern("ETag") 

56 LAST_MODIFIED = sys.intern("Last-Modified") 

57 VARY = sys.intern("Vary") 

58 

59 CACHEABLE_METHODS = frozenset((GET, HEAD)) 

60 CACHEABLE_STATUS_CODES = frozenset( 

61 (200, 203, 204, 206, 300, 301, 404, 405, 410, 414, 501), 

62 ) 

63 ONE_YEAR = 60 * 60 * 24 * 365 

64 INVALIDATING_METHODS = frozenset((POST, PUT, PATCH, DELETE)) 

65 

66 @staticmethod 

67 def safe_log(logger: t.Any, level: str, message: str) -> None: 

68 if logger and hasattr(logger, level): 

69 getattr(logger, level)(message) 

70 

71 

72cacheable_methods = CacheUtils.CACHEABLE_METHODS 

73cacheable_status_codes = CacheUtils.CACHEABLE_STATUS_CODES 

74one_year = CacheUtils.ONE_YEAR 

75invalidating_methods = CacheUtils.INVALIDATING_METHODS 

76 

77 

78@dataclass 

79class Rule: 

80 match: str | re.Pattern[str] | Iterable[str | re.Pattern[str]] = "*" 

81 status: int | Iterable[int] | None = None 

82 ttl: float | None = None 

83 

84 

85def _check_rule_match(match: list[str | re.Pattern[str]], path: str) -> bool: 

86 """Check if any rule matches the request path.""" 

87 for item in match: 

88 if isinstance(item, re.Pattern): 

89 if item.match(path): 

90 return True 

91 elif item in ("*", path): 

92 return True 

93 return False 

94 

95 

96def _check_response_status_match(rule: Rule, response: Response) -> bool: 

97 """Check if response status code matches the rule.""" 

98 if rule.status is not None: 

99 statuses = [rule.status] if isinstance(rule.status, int) else rule.status 

100 if response.status_code not in statuses: 

101 return False 

102 return True 

103 

104 

105class CacheRules: 

106 @staticmethod 

107 def request_matches_rule(rule: Rule, *, request: Request) -> bool: 

108 match = ( 

109 [rule.match] 

110 if isinstance(rule.match, str | re.Pattern) 

111 else list(rule.match) 

112 ) 

113 return _check_rule_match(match, request.url.path) 

114 

115 @staticmethod 

116 def response_matches_rule( 

117 rule: Rule, 

118 *, 

119 request: Request, 

120 response: Response, 

121 ) -> bool: 

122 # First check if request matches the rule 

123 if not CacheRules.request_matches_rule(rule, request=request): 

124 return False 

125 # Then check if response status matches 

126 return _check_response_status_match(rule, response) 

127 

128 @staticmethod 

129 def get_rule_matching_request( 

130 rules: Sequence[Rule], 

131 *, 

132 request: Request, 

133 ) -> Rule | None: 

134 return next( 

135 ( 

136 rule 

137 for rule in rules 

138 if CacheRules.request_matches_rule(rule, request=request) 

139 ), 

140 None, 

141 ) 

142 

143 @staticmethod 

144 def get_rule_matching_response( 

145 rules: Sequence[Rule], 

146 *, 

147 request: Request, 

148 response: Response, 

149 ) -> Rule | None: 

150 return next( 

151 ( 

152 rule 

153 for rule in rules 

154 if CacheRules.response_matches_rule( 

155 rule, 

156 request=request, 

157 response=response, 

158 ) 

159 ), 

160 None, 

161 ) 

162 

163 

164def get_rule_matching_request( 

165 rules: Sequence[Rule], 

166 *, 

167 request: Request, 

168) -> Rule | None: 

169 method = getattr(CacheRules, "get_rule_matching_request") 

170 result = method(rules, request=request) 

171 return t.cast(Rule | None, result) 

172 

173 

174def get_rule_matching_response( 

175 rules: Sequence[Rule], 

176 *, 

177 request: Request, 

178 response: Response, 

179) -> Rule | None: 

180 method = getattr(CacheRules, "get_rule_matching_response") 

181 result = method(rules, request=request, response=response) 

182 return t.cast(Rule | None, result) 

183 

184 

185def request_matches_rule(rule: Rule, *, request: Request) -> bool: 

186 method = getattr(CacheRules, "request_matches_rule") 

187 result = method(rule, request=request) 

188 return t.cast(bool, result) 

189 

190 

191def response_matches_rule(rule: Rule, *, request: Request, response: Response) -> bool: 

192 method = getattr(CacheRules, "response_matches_rule") 

193 result = method(rule, request=request, response=response) 

194 return t.cast(bool, result) 

195 

196 

197class CacheDirectives(t.TypedDict, total=False): 

198 max_age: int 

199 s_maxage: int 

200 no_cache: bool 

201 no_store: bool 

202 no_transform: bool 

203 must_revalidate: bool 

204 proxy_revalidate: bool 

205 must_understand: bool 

206 private: bool 

207 public: bool 

208 immutable: bool 

209 stale_while_revalidate: int 

210 stale_if_error: int 

211 

212 

213async def set_in_cache( 

214 response: Response, 

215 *, 

216 request: Request, 

217 rules: Sequence[Rule], 

218 cache: t.Any = None, 

219 logger: t.Any = None, 

220) -> None: 

221 # Initialize dependencies if not provided 

222 cache, logger = _init_cache_dependencies(cache, logger) 

223 

224 # Validate response can be cached 

225 _validate_response_cacheable(response, request, logger) 

226 

227 # Find matching rule for caching 

228 rule = get_rule_matching_response(rules, request=request, response=response) 

229 if not rule: 

230 _safe_log(logger, "debug", "response_not_cacheable reason=rule") 

231 raise ResponseNotCachable(response) 

232 

233 # Calculate TTL and max age 

234 ttl, max_age = _calculate_cache_ttl(rule, cache, logger) 

235 

236 # Set cache headers 

237 _set_cache_headers(response, max_age, logger) 

238 

239 # Generate cache key and serialize response 

240 cache_key = await learn_cache_key(request, response, cache=cache) 

241 serialized_response = serialize_response(response) 

242 

243 # Store in cache 

244 await _store_in_cache(cache, cache_key, serialized_response, ttl, logger) 

245 

246 # Update response header 

247 response.headers["X-Cache"] = "miss" 

248 

249 

250def _init_cache_dependencies(cache: t.Any, logger: t.Any) -> tuple[t.Any, t.Any]: 

251 """Initialize cache and logger dependencies.""" 

252 if cache is None: 

253 cache = depends.get("cache") 

254 if logger is None: 

255 logger = depends.get("logger") 

256 return cache, logger 

257 

258 

259def _validate_response_cacheable( 

260 response: Response, request: Request, logger: t.Any 

261) -> None: 

262 """Validate that a response can be cached.""" 

263 if response.status_code not in cacheable_status_codes: 

264 _safe_log(logger, "debug", "response_not_cacheable reason=status_code") 

265 raise ResponseNotCachable(response) 

266 if not request.cookies and "Set-Cookie" in response.headers: 

267 _safe_log( 

268 logger, 

269 "debug", 

270 "response_not_cacheable reason=cookies_for_cookieless_request", 

271 ) 

272 raise ResponseNotCachable(response) 

273 

274 

275def _calculate_cache_ttl(rule: Rule, cache: t.Any, logger: t.Any) -> tuple[t.Any, int]: 

276 """Calculate TTL and max age for caching.""" 

277 ttl = rule.ttl if rule.ttl is not None else cache.ttl 

278 if ttl == 0: 

279 _safe_log(logger, "debug", "response_not_cacheable reason=zero_ttl") 

280 # Create a minimal response for the exception 

281 raise ResponseNotCachable(Response(content=b"", status_code=200)) 

282 

283 if ttl is None: 

284 max_age = one_year 

285 _safe_log(logger, "debug", f"max_out_ttl value={max_age!r}") 

286 else: 

287 max_age = int(ttl) 

288 _safe_log(logger, "debug", f"set_in_cache max_age={max_age!r}") 

289 return ttl, max_age 

290 

291 

292def _set_cache_headers(response: Response, max_age: int, logger: t.Any) -> None: 

293 """Set cache headers on the response.""" 

294 response.headers["X-Cache"] = "hit" 

295 cache_headers = get_cache_response_headers(response, max_age=max_age) 

296 _safe_log(logger, "debug", f"patch_response_headers headers={cache_headers!r}") 

297 response.headers.update(cache_headers) 

298 

299 

300async def _store_in_cache( 

301 cache: t.Any, 

302 cache_key: str, 

303 serialized_response: dict[str, t.Any], 

304 ttl: t.Any, 

305 logger: t.Any, 

306) -> None: 

307 """Store serialized response in cache.""" 

308 _safe_log( 

309 logger, 

310 "debug", 

311 f"set_response_in_cache key={cache_key!r} value={serialized_response!r}", 

312 ) 

313 kwargs = {} 

314 if ttl is not None: 

315 kwargs["ttl"] = ttl 

316 await cache.set(key=cache_key, value=serialized_response, **kwargs) 

317 

318 

319async def get_from_cache( 

320 request: Request, 

321 *, 

322 rules: Sequence[Rule], 

323 cache: t.Any = None, 

324 logger: t.Any = None, 

325) -> Response | None: 

326 # Initialize dependencies if not provided 

327 cache, logger = _init_cache_dependencies(cache, logger) 

328 

329 # Log request details 

330 _safe_log( 

331 logger, 

332 "debug", 

333 f"get_from_cache request.url={str(request.url)!r} request.method={request.method!r}", 

334 ) 

335 

336 # Validate request can use cache 

337 _validate_request_cacheable(request, logger) 

338 

339 # Find matching rule 

340 rule = getattr(CacheRules, "get_rule_matching_request")(rules, request=request) 

341 if rule is None: 

342 _safe_log(logger, "debug", "request_not_cacheable reason=rule") 

343 raise RequestNotCachable(request) 

344 

345 # Try to get cached response 

346 return await _try_get_cached_response(request, cache, logger) 

347 

348 

349def _validate_request_cacheable(request: Request, logger: t.Any) -> None: 

350 """Validate that a request can use the cache.""" 

351 if request.method not in cacheable_methods: 

352 _safe_log(logger, "debug", "request_not_cacheable reason=method") 

353 raise RequestNotCachable(request) 

354 

355 

356async def _try_get_cached_response( 

357 request: Request, cache: t.Any, logger: t.Any 

358) -> Response | None: 

359 """Try to get a cached response for the request.""" 

360 # Try GET method first 

361 _safe_log(logger, "debug", "lookup_cached_response method='GET'") 

362 cache_key = await get_cache_key(request, method="GET", cache=cache) 

363 if cache_key is not None: 

364 serialized_response = await cache.get(cache_key) 

365 if serialized_response is not None: 

366 return _return_cached_response(cache_key, serialized_response, logger) 

367 

368 # Try HEAD method 

369 _safe_log(logger, "debug", "lookup_cached_response method='HEAD'") 

370 cache_key = await get_cache_key(request, method="HEAD", cache=cache) 

371 if cache_key is not None: 

372 serialized_response = await cache.get(cache_key) 

373 if serialized_response is not None: 

374 return _return_cached_response(cache_key, serialized_response, logger) 

375 

376 # No cached response found 

377 _safe_log(logger, "debug", "cached_response found=False") 

378 return None 

379 

380 

381def _return_cached_response( 

382 cache_key: str, serialized_response: t.Any, logger: t.Any 

383) -> Response: 

384 """Return a cached response after logging.""" 

385 _safe_log( 

386 logger, 

387 "debug", 

388 f"cached_response found=True key={cache_key!r} value={serialized_response!r}", 

389 ) 

390 return deserialize_response(serialized_response) 

391 

392 

393async def delete_from_cache( 

394 url: URL, 

395 *, 

396 vary: Headers, 

397 cache: t.Any = None, 

398 logger: t.Any = None, 

399) -> None: 

400 if cache is None or logger is None: 

401 if cache is None: 

402 cache = depends.get("cache") 

403 if logger is None: 

404 logger = depends.get("logger") 

405 

406 varying_headers_cache_key = await generate_varying_headers_cache_key(url) 

407 varying_headers = await cache.get(varying_headers_cache_key) 

408 if varying_headers is None: 

409 return 

410 

411 await _delete_cache_entries(url, vary, cache, logger, varying_headers) 

412 await cache.delete(varying_headers_cache_key) 

413 

414 

415async def _delete_cache_entries( 

416 url: URL, 

417 vary: Headers, 

418 cache: t.Any, 

419 logger: t.Any, 

420 varying_headers: t.Any, 

421) -> None: 

422 """Delete cache entries for GET and HEAD methods.""" 

423 for method in ("GET", "HEAD"): 

424 cache_key = await generate_cache_key( 

425 url, 

426 method=method, 

427 headers=vary, 

428 varying_headers=varying_headers, 

429 ) 

430 if cache_key is None: 

431 continue 

432 

433 logger.debug(f"clear_cache key={cache_key!r}") 

434 await cache.delete(cache_key) 

435 

436 # Publish cache invalidation event (async, don't block) 

437 with suppress(Exception): 

438 

439 async def _publish_event() -> None: 

440 from .adapters.templates._events_wrapper import ( 

441 publish_cache_invalidation, 

442 ) 

443 

444 await publish_cache_invalidation( 

445 cache_key=cache_key, 

446 reason="url_invalidation", 

447 invalidated_by="cache_middleware", 

448 affected_templates=None, 

449 ) 

450 

451 asyncio.create_task(_publish_event()) 

452 

453 

454def serialize_response(response: Response) -> dict[str, t.Any]: 

455 """Serialize a response for caching.""" 

456 return { 

457 "content": _base64_encodebytes(response.body).decode("ascii"), 

458 "status_code": response.status_code, 

459 "headers": dict(response.headers), 

460 } 

461 

462 

463def deserialize_response(serialized_response: t.Any) -> Response: 

464 """Deserialize a cached response.""" 

465 _validate_serialized_response(serialized_response) 

466 

467 content = serialized_response["content"] 

468 status_code = serialized_response["status_code"] 

469 headers = serialized_response["headers"] 

470 

471 return Response( 

472 content=_base64_decodebytes(_str_encode(content, "ascii")), 

473 status_code=status_code, 

474 headers=headers, 

475 ) 

476 

477 

478def _validate_serialized_response(serialized_response: t.Any) -> None: 

479 """Validate the structure of a serialized response.""" 

480 if not isinstance(serialized_response, dict): 

481 msg = f"Expected dict, got {type(serialized_response)}" 

482 raise TypeError(msg) 

483 content = serialized_response.get("content") 

484 if not isinstance(content, str): 

485 msg = f"Expected content to be str, got {type(content)}" 

486 raise TypeError(msg) 

487 status_code = serialized_response.get("status_code") 

488 if not isinstance(status_code, int): 

489 msg = f"Expected status_code to be int, got {type(status_code)}" 

490 raise TypeError(msg) 

491 headers = serialized_response.get("headers") 

492 if not isinstance(headers, dict): 

493 msg = f"Expected headers to be dict, got {type(headers)}" 

494 raise TypeError(msg) 

495 

496 

497async def learn_cache_key( 

498 request: Request, 

499 response: Response, 

500 *, 

501 cache: t.Any = None, 

502 logger: t.Any = None, 

503) -> str: 

504 if cache is None or logger is None: 

505 if cache is None: 

506 cache = depends.get("cache") 

507 if logger is None: 

508 logger = depends.get("logger") 

509 logger.debug( 

510 f"learn_cache_key request.method={request.method!r} response.headers.Vary={response.headers.get('Vary')!r}", 

511 ) 

512 url = request.url 

513 varying_headers_cache_key = await generate_varying_headers_cache_key(url) 

514 cached_vary_headers = set(await cache.get(key=varying_headers_cache_key) or ()) 

515 response_vary_headers = { 

516 header.lower() for header in parse_http_list(response.headers.get("Vary", "")) 

517 } 

518 varying_headers = sorted(response_vary_headers | cached_vary_headers) 

519 if varying_headers: 

520 response.headers["Vary"] = ", ".join(varying_headers) 

521 logger.debug( 

522 f"store_varying_headers cache_key={varying_headers_cache_key!r} headers={varying_headers!r}", 

523 ) 

524 await cache.set(key=varying_headers_cache_key, value=varying_headers) 

525 cache_key = await generate_cache_key( 

526 url, 

527 method=request.method, 

528 headers=request.headers, 

529 varying_headers=varying_headers, 

530 ) 

531 if cache_key is None: 

532 msg = f"Unable to generate cache key for method {request.method}" 

533 raise ValueError(msg) 

534 return cache_key 

535 

536 

537async def get_cache_key( 

538 request: Request, 

539 method: str, 

540 cache: t.Any = None, 

541 logger: t.Any = None, 

542) -> str | None: 

543 if cache is None or logger is None: 

544 if cache is None: 

545 cache = depends.get("cache") 

546 if logger is None: 

547 logger = depends.get("logger") 

548 url = request.url 

549 _safe_log( 

550 logger, 

551 "debug", 

552 f"get_cache_key request.url={str(url)!r} method={method!r}", 

553 ) 

554 varying_headers_cache_key = await generate_varying_headers_cache_key(url) 

555 varying_headers = await cache.get(varying_headers_cache_key) 

556 if varying_headers is None: 

557 _safe_log(logger, "debug", "varying_headers found=False") 

558 return None 

559 _safe_log( 

560 logger, 

561 "debug", 

562 f"varying_headers found=True headers={varying_headers!r}", 

563 ) 

564 return await generate_cache_key( 

565 request.url, 

566 method=method, 

567 headers=request.headers, 

568 varying_headers=varying_headers, 

569 ) 

570 

571 

572async def generate_cache_key( 

573 url: URL, 

574 method: str, 

575 headers: Headers, 

576 varying_headers: list[str], 

577 config: t.Any = None, 

578) -> str | None: 

579 """Generate cache key using ACB's fast CRC32C hashing.""" 

580 if config is None: 

581 config = depends.get("config") 

582 

583 if method not in cacheable_methods: 

584 return None 

585 

586 # Both hash functions are now async for better performance 

587 vary_hash = await _generate_vary_hash(headers, varying_headers) 

588 url_hash = await _generate_url_hash(url) 

589 

590 return f"{config.app.name}:cached:{method}.{url_hash}.{vary_hash}" 

591 

592 

593async def _generate_vary_hash(headers: Headers, varying_headers: list[str]) -> str: 

594 """Generate hash for varying headers using ACB's fast CRC32C.""" 

595 vary_values = [ 

596 f"{header}:{value}" 

597 for header in varying_headers 

598 if (value := headers.get(header)) is not None 

599 ] 

600 

601 if not vary_values: 

602 return "" 

603 

604 # ACB's CRC32C is 50x faster than MD5 for cache keys (non-cryptographic) 

605 result = await hash.crc32c("|".join(vary_values)) 

606 return str(result) # Ensure return type is str 

607 

608 

609async def _generate_url_hash(url: URL) -> str: 

610 """Generate hash for URL using ACB's fast CRC32C.""" 

611 # ACB's CRC32C is 50x faster than MD5 for cache keys (non-cryptographic) 

612 result = await hash.crc32c(str(url)) 

613 return str(result) # Ensure return type is str 

614 

615 

616async def generate_varying_headers_cache_key(url: URL) -> str: 

617 """Generate cache key for varying headers using ACB's fast CRC32C.""" 

618 # ACB's CRC32C is 50x faster than MD5 for cache keys (non-cryptographic) 

619 url_hash = await hash.crc32c(str(url.path)) 

620 return f"varying_headers.{url_hash}" 

621 

622 

623def get_cache_response_headers(response: Response, *, max_age: int) -> dict[str, str]: 

624 max_age = max(max_age, 0) 

625 headers = {} 

626 if "Expires" not in response.headers: 

627 headers["Expires"] = email.utils.formatdate(time.time() + max_age, usegmt=True) 

628 patch_cache_control(response.headers, max_age=max_age) 

629 

630 return headers 

631 

632 

633def patch_cache_control( 

634 headers: MutableHeaders, 

635 **kwargs: t.Unpack[CacheDirectives], 

636) -> None: 

637 cache_control: dict[str, t.Any] = {} 

638 value: t.Any 

639 for field in parse_http_list(headers.get("Cache-Control", "")): 

640 try: 

641 key, value = field.split("=") 

642 except ValueError: 

643 cache_control[field] = True 

644 else: 

645 cache_control[key] = value 

646 

647 if "max-age" in cache_control and "max_age" in kwargs: 

648 kwargs["max_age"] = min(int(cache_control["max-age"]), kwargs["max_age"]) 

649 

650 # Check for unsupported directives 

651 _check_unsupported_directives(kwargs) 

652 

653 for key, value in kwargs.items(): 

654 key = key.replace("_", "-") 

655 cache_control[key] = value 

656 

657 directives: list[str] = [] 

658 for key, value in cache_control.items(): 

659 if value is False: 

660 continue 

661 if value is True: 

662 directives.append(key) 

663 else: 

664 directives.append(f"{key}={value}") 

665 

666 patched_cache_control = ", ".join(directives) 

667 if patched_cache_control: 

668 headers["Cache-Control"] = patched_cache_control 

669 else: 

670 del headers["Cache-Control"] 

671 

672 

673def _check_unsupported_directives(kwargs: t.Any) -> None: 

674 """Check for unsupported cache control directives.""" 

675 if "public" in kwargs: 

676 msg = "The 'public' cache control directive isn't supported yet." 

677 raise NotImplementedError(msg) 

678 if "private" in kwargs: 

679 msg = "The 'private' cache control directive isn't supported yet." 

680 raise NotImplementedError(msg) 

681 

682 

683class CacheResponder: 

684 def __init__(self, app: ASGIApp, *, rules: Sequence[Rule]) -> None: 

685 self.app = app 

686 self.rules = rules 

687 try: 

688 self.logger = depends.get("logger") 

689 except Exception: 

690 import logging 

691 

692 self.logger = logging.getLogger("fastblocks.cache") 

693 try: 

694 self.cache = depends.get("cache") 

695 except Exception: 

696 self.cache = None 

697 self.initial_message: Message = {} 

698 self.is_response_cacheable = True 

699 self.request: Request | None = None 

700 

701 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

702 if scope["type"] != "http": 

703 await self.app(scope, receive, send) 

704 return 

705 self.request = request = Request(scope) 

706 try: 

707 response = await get_from_cache(request, cache=self.cache, rules=self.rules) 

708 except RequestNotCachable: 

709 if request.method in invalidating_methods: 

710 send = partial(self.send_then_invalidate, send=send) 

711 else: 

712 if response is not None: 

713 _safe_log(self.logger, "debug", "cache_lookup HIT") 

714 await response(scope, receive, send) 

715 return 

716 send = partial(self.send_with_caching, send=send) 

717 _safe_log(self.logger, "debug", "cache_lookup MISS") 

718 await self.app(scope, receive, send) 

719 

720 async def send_with_caching(self, message: Message, *, send: Send) -> None: 

721 if not self.is_response_cacheable or message["type"] not in ( 

722 "http.response.start", 

723 "http.response.body", 

724 ): 

725 await send(message) 

726 return 

727 if message["type"] == "http.response.start": 

728 self.initial_message = message 

729 return 

730 if message["type"] != "http.response.body": 

731 return 

732 if message.get("more_body", False): 

733 _safe_log( 

734 self.logger, 

735 "debug", 

736 "response_not_cacheable reason=is_streaming", 

737 ) 

738 self.is_response_cacheable = False 

739 await send(self.initial_message) 

740 await send(message) 

741 return 

742 if self.request is None: 

743 return 

744 body = message["body"] 

745 response = Response(content=body, status_code=self.initial_message["status"]) 

746 response.raw_headers = list(self.initial_message["headers"]) 

747 try: 

748 await set_in_cache( 

749 response, 

750 request=self.request, 

751 cache=self.cache, 

752 rules=self.rules, 

753 ) 

754 except ResponseNotCachable: 

755 self.is_response_cacheable = False 

756 else: 

757 self.initial_message["headers"] = response.raw_headers.copy() 

758 await send(self.initial_message) 

759 await send(message) 

760 

761 async def send_then_invalidate(self, message: Message, *, send: Send) -> None: 

762 if self.request is None: 

763 return 

764 if message["type"] == "http.response.start" and 200 <= message["status"] < 400: 

765 await delete_from_cache( 

766 self.request.url, 

767 vary=self.request.headers, 

768 cache=self.cache, 

769 ) 

770 await send(message) 

771 

772 

773class CacheControlResponder: 

774 def __init__(self, app: ASGIApp, **kwargs: t.Unpack[CacheDirectives]) -> None: 

775 self.app = app 

776 self.kwargs = kwargs 

777 try: 

778 self.logger = depends.get("logger") 

779 except Exception: 

780 import logging 

781 

782 self.logger = logging.getLogger("fastblocks.cache") 

783 

784 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

785 if scope["type"] != "http": 

786 await self.app(scope, receive, send) 

787 return 

788 send = partial(self.send_with_caching, send=send) 

789 await self.app(scope, receive, send) 

790 

791 @staticmethod 

792 def kvformat(**kwargs: t.Any) -> str: 

793 return " ".join((f"{key}={value}" for key, value in kwargs.items())) 

794 

795 async def send_with_caching(self, message: Message, *, send: Send) -> None: 

796 if message["type"] == "http.response.start": 

797 _safe_log( 

798 self.logger, 

799 "debug", 

800 f"patch_cache_control {self.kvformat(**self.kwargs)}", 

801 ) 

802 headers = MutableHeaders(raw=list(message["headers"])) 

803 patch_cache_control(headers, **self.kwargs) 

804 message["headers"] = headers.raw 

805 await send(message)