Coverage for fastblocks / caching.py: 69%
432 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
1import asyncio
2import base64
3import email.utils
4import re
5import sys
6import time
7import typing as t
8from collections.abc import Iterable, Sequence
9from contextlib import suppress
10from dataclasses import dataclass
11from functools import partial
12from urllib.request import parse_http_list
14from acb.actions.hash import hash
15from starlette.datastructures import URL, Headers, MutableHeaders
16from starlette.requests import Request
17from starlette.responses import Response
19HashFunc = t.Callable[[t.Any], str]
20GetAdapterFunc = t.Callable[[str], t.Any]
21ImportAdapterFunc = t.Callable[[str | list[str] | None], t.Any]
22from acb.adapters import get_adapter
23from acb.depends import depends
24from starlette.types import ASGIApp, Message, Receive, Scope, Send
26from .exceptions import RequestNotCachable, ResponseNotCachable
29def _safe_log(logger: t.Any, level: str, message: str) -> None:
30 return CacheUtils.safe_log(logger, level, message)
33_CacheClass = None
35_str_encode = str.encode
36_base64_encodebytes = base64.encodebytes
37_base64_decodebytes = base64.decodebytes
40def get_cache() -> t.Any:
41 global _CacheClass
42 if _CacheClass is None:
43 _CacheClass = get_adapter("cache")
44 return _CacheClass
47class CacheUtils:
48 GET = sys.intern("GET")
49 HEAD = sys.intern("HEAD")
50 POST = sys.intern("POST")
51 PUT = sys.intern("PUT")
52 PATCH = sys.intern("PATCH")
53 DELETE = sys.intern("DELETE")
54 CACHE_CONTROL = sys.intern("Cache-Control")
55 ETAG = sys.intern("ETag")
56 LAST_MODIFIED = sys.intern("Last-Modified")
57 VARY = sys.intern("Vary")
59 CACHEABLE_METHODS = frozenset((GET, HEAD))
60 CACHEABLE_STATUS_CODES = frozenset(
61 (200, 203, 204, 206, 300, 301, 404, 405, 410, 414, 501),
62 )
63 ONE_YEAR = 60 * 60 * 24 * 365
64 INVALIDATING_METHODS = frozenset((POST, PUT, PATCH, DELETE))
66 @staticmethod
67 def safe_log(logger: t.Any, level: str, message: str) -> None:
68 if logger and hasattr(logger, level):
69 getattr(logger, level)(message)
72cacheable_methods = CacheUtils.CACHEABLE_METHODS
73cacheable_status_codes = CacheUtils.CACHEABLE_STATUS_CODES
74one_year = CacheUtils.ONE_YEAR
75invalidating_methods = CacheUtils.INVALIDATING_METHODS
78@dataclass
79class Rule:
80 match: str | re.Pattern[str] | Iterable[str | re.Pattern[str]] = "*"
81 status: int | Iterable[int] | None = None
82 ttl: float | None = None
85def _check_rule_match(match: list[str | re.Pattern[str]], path: str) -> bool:
86 """Check if any rule matches the request path."""
87 for item in match:
88 if isinstance(item, re.Pattern):
89 if item.match(path):
90 return True
91 elif item in ("*", path):
92 return True
93 return False
96def _check_response_status_match(rule: Rule, response: Response) -> bool:
97 """Check if response status code matches the rule."""
98 if rule.status is not None:
99 statuses = [rule.status] if isinstance(rule.status, int) else rule.status
100 if response.status_code not in statuses:
101 return False
102 return True
105class CacheRules:
106 @staticmethod
107 def request_matches_rule(rule: Rule, *, request: Request) -> bool:
108 match = (
109 [rule.match]
110 if isinstance(rule.match, str | re.Pattern)
111 else list(rule.match)
112 )
113 return _check_rule_match(match, request.url.path)
115 @staticmethod
116 def response_matches_rule(
117 rule: Rule,
118 *,
119 request: Request,
120 response: Response,
121 ) -> bool:
122 # First check if request matches the rule
123 if not CacheRules.request_matches_rule(rule, request=request):
124 return False
125 # Then check if response status matches
126 return _check_response_status_match(rule, response)
128 @staticmethod
129 def get_rule_matching_request(
130 rules: Sequence[Rule],
131 *,
132 request: Request,
133 ) -> Rule | None:
134 return next(
135 (
136 rule
137 for rule in rules
138 if CacheRules.request_matches_rule(rule, request=request)
139 ),
140 None,
141 )
143 @staticmethod
144 def get_rule_matching_response(
145 rules: Sequence[Rule],
146 *,
147 request: Request,
148 response: Response,
149 ) -> Rule | None:
150 return next(
151 (
152 rule
153 for rule in rules
154 if CacheRules.response_matches_rule(
155 rule,
156 request=request,
157 response=response,
158 )
159 ),
160 None,
161 )
164def get_rule_matching_request(
165 rules: Sequence[Rule],
166 *,
167 request: Request,
168) -> Rule | None:
169 method = getattr(CacheRules, "get_rule_matching_request")
170 result = method(rules, request=request)
171 return t.cast(Rule | None, result)
174def get_rule_matching_response(
175 rules: Sequence[Rule],
176 *,
177 request: Request,
178 response: Response,
179) -> Rule | None:
180 method = getattr(CacheRules, "get_rule_matching_response")
181 result = method(rules, request=request, response=response)
182 return t.cast(Rule | None, result)
185def request_matches_rule(rule: Rule, *, request: Request) -> bool:
186 method = getattr(CacheRules, "request_matches_rule")
187 result = method(rule, request=request)
188 return t.cast(bool, result)
191def response_matches_rule(rule: Rule, *, request: Request, response: Response) -> bool:
192 method = getattr(CacheRules, "response_matches_rule")
193 result = method(rule, request=request, response=response)
194 return t.cast(bool, result)
197class CacheDirectives(t.TypedDict, total=False):
198 max_age: int
199 s_maxage: int
200 no_cache: bool
201 no_store: bool
202 no_transform: bool
203 must_revalidate: bool
204 proxy_revalidate: bool
205 must_understand: bool
206 private: bool
207 public: bool
208 immutable: bool
209 stale_while_revalidate: int
210 stale_if_error: int
213async def set_in_cache(
214 response: Response,
215 *,
216 request: Request,
217 rules: Sequence[Rule],
218 cache: t.Any = None,
219 logger: t.Any = None,
220) -> None:
221 # Initialize dependencies if not provided
222 cache, logger = _init_cache_dependencies(cache, logger)
224 # Validate response can be cached
225 _validate_response_cacheable(response, request, logger)
227 # Find matching rule for caching
228 rule = get_rule_matching_response(rules, request=request, response=response)
229 if not rule:
230 _safe_log(logger, "debug", "response_not_cacheable reason=rule")
231 raise ResponseNotCachable(response)
233 # Calculate TTL and max age
234 ttl, max_age = _calculate_cache_ttl(rule, cache, logger)
236 # Set cache headers
237 _set_cache_headers(response, max_age, logger)
239 # Generate cache key and serialize response
240 cache_key = await learn_cache_key(request, response, cache=cache)
241 serialized_response = serialize_response(response)
243 # Store in cache
244 await _store_in_cache(cache, cache_key, serialized_response, ttl, logger)
246 # Update response header
247 response.headers["X-Cache"] = "miss"
250def _init_cache_dependencies(cache: t.Any, logger: t.Any) -> tuple[t.Any, t.Any]:
251 """Initialize cache and logger dependencies."""
252 if cache is None:
253 cache = depends.get("cache")
254 if logger is None:
255 logger = depends.get("logger")
256 return cache, logger
259def _validate_response_cacheable(
260 response: Response, request: Request, logger: t.Any
261) -> None:
262 """Validate that a response can be cached."""
263 if response.status_code not in cacheable_status_codes:
264 _safe_log(logger, "debug", "response_not_cacheable reason=status_code")
265 raise ResponseNotCachable(response)
266 if not request.cookies and "Set-Cookie" in response.headers:
267 _safe_log(
268 logger,
269 "debug",
270 "response_not_cacheable reason=cookies_for_cookieless_request",
271 )
272 raise ResponseNotCachable(response)
275def _calculate_cache_ttl(rule: Rule, cache: t.Any, logger: t.Any) -> tuple[t.Any, int]:
276 """Calculate TTL and max age for caching."""
277 ttl = rule.ttl if rule.ttl is not None else cache.ttl
278 if ttl == 0:
279 _safe_log(logger, "debug", "response_not_cacheable reason=zero_ttl")
280 # Create a minimal response for the exception
281 raise ResponseNotCachable(Response(content=b"", status_code=200))
283 if ttl is None:
284 max_age = one_year
285 _safe_log(logger, "debug", f"max_out_ttl value={max_age!r}")
286 else:
287 max_age = int(ttl)
288 _safe_log(logger, "debug", f"set_in_cache max_age={max_age!r}")
289 return ttl, max_age
292def _set_cache_headers(response: Response, max_age: int, logger: t.Any) -> None:
293 """Set cache headers on the response."""
294 response.headers["X-Cache"] = "hit"
295 cache_headers = get_cache_response_headers(response, max_age=max_age)
296 _safe_log(logger, "debug", f"patch_response_headers headers={cache_headers!r}")
297 response.headers.update(cache_headers)
300async def _store_in_cache(
301 cache: t.Any,
302 cache_key: str,
303 serialized_response: dict[str, t.Any],
304 ttl: t.Any,
305 logger: t.Any,
306) -> None:
307 """Store serialized response in cache."""
308 _safe_log(
309 logger,
310 "debug",
311 f"set_response_in_cache key={cache_key!r} value={serialized_response!r}",
312 )
313 kwargs = {}
314 if ttl is not None:
315 kwargs["ttl"] = ttl
316 await cache.set(key=cache_key, value=serialized_response, **kwargs)
319async def get_from_cache(
320 request: Request,
321 *,
322 rules: Sequence[Rule],
323 cache: t.Any = None,
324 logger: t.Any = None,
325) -> Response | None:
326 # Initialize dependencies if not provided
327 cache, logger = _init_cache_dependencies(cache, logger)
329 # Log request details
330 _safe_log(
331 logger,
332 "debug",
333 f"get_from_cache request.url={str(request.url)!r} request.method={request.method!r}",
334 )
336 # Validate request can use cache
337 _validate_request_cacheable(request, logger)
339 # Find matching rule
340 rule = getattr(CacheRules, "get_rule_matching_request")(rules, request=request)
341 if rule is None:
342 _safe_log(logger, "debug", "request_not_cacheable reason=rule")
343 raise RequestNotCachable(request)
345 # Try to get cached response
346 return await _try_get_cached_response(request, cache, logger)
349def _validate_request_cacheable(request: Request, logger: t.Any) -> None:
350 """Validate that a request can use the cache."""
351 if request.method not in cacheable_methods:
352 _safe_log(logger, "debug", "request_not_cacheable reason=method")
353 raise RequestNotCachable(request)
356async def _try_get_cached_response(
357 request: Request, cache: t.Any, logger: t.Any
358) -> Response | None:
359 """Try to get a cached response for the request."""
360 # Try GET method first
361 _safe_log(logger, "debug", "lookup_cached_response method='GET'")
362 cache_key = await get_cache_key(request, method="GET", cache=cache)
363 if cache_key is not None:
364 serialized_response = await cache.get(cache_key)
365 if serialized_response is not None:
366 return _return_cached_response(cache_key, serialized_response, logger)
368 # Try HEAD method
369 _safe_log(logger, "debug", "lookup_cached_response method='HEAD'")
370 cache_key = await get_cache_key(request, method="HEAD", cache=cache)
371 if cache_key is not None:
372 serialized_response = await cache.get(cache_key)
373 if serialized_response is not None:
374 return _return_cached_response(cache_key, serialized_response, logger)
376 # No cached response found
377 _safe_log(logger, "debug", "cached_response found=False")
378 return None
381def _return_cached_response(
382 cache_key: str, serialized_response: t.Any, logger: t.Any
383) -> Response:
384 """Return a cached response after logging."""
385 _safe_log(
386 logger,
387 "debug",
388 f"cached_response found=True key={cache_key!r} value={serialized_response!r}",
389 )
390 return deserialize_response(serialized_response)
393async def delete_from_cache(
394 url: URL,
395 *,
396 vary: Headers,
397 cache: t.Any = None,
398 logger: t.Any = None,
399) -> None:
400 if cache is None or logger is None:
401 if cache is None:
402 cache = depends.get("cache")
403 if logger is None:
404 logger = depends.get("logger")
406 varying_headers_cache_key = await generate_varying_headers_cache_key(url)
407 varying_headers = await cache.get(varying_headers_cache_key)
408 if varying_headers is None:
409 return
411 await _delete_cache_entries(url, vary, cache, logger, varying_headers)
412 await cache.delete(varying_headers_cache_key)
415async def _delete_cache_entries(
416 url: URL,
417 vary: Headers,
418 cache: t.Any,
419 logger: t.Any,
420 varying_headers: t.Any,
421) -> None:
422 """Delete cache entries for GET and HEAD methods."""
423 for method in ("GET", "HEAD"):
424 cache_key = await generate_cache_key(
425 url,
426 method=method,
427 headers=vary,
428 varying_headers=varying_headers,
429 )
430 if cache_key is None:
431 continue
433 logger.debug(f"clear_cache key={cache_key!r}")
434 await cache.delete(cache_key)
436 # Publish cache invalidation event (async, don't block)
437 with suppress(Exception):
439 async def _publish_event() -> None:
440 from .adapters.templates._events_wrapper import (
441 publish_cache_invalidation,
442 )
444 await publish_cache_invalidation(
445 cache_key=cache_key,
446 reason="url_invalidation",
447 invalidated_by="cache_middleware",
448 affected_templates=None,
449 )
451 asyncio.create_task(_publish_event())
454def serialize_response(response: Response) -> dict[str, t.Any]:
455 """Serialize a response for caching."""
456 return {
457 "content": _base64_encodebytes(response.body).decode("ascii"),
458 "status_code": response.status_code,
459 "headers": dict(response.headers),
460 }
463def deserialize_response(serialized_response: t.Any) -> Response:
464 """Deserialize a cached response."""
465 _validate_serialized_response(serialized_response)
467 content = serialized_response["content"]
468 status_code = serialized_response["status_code"]
469 headers = serialized_response["headers"]
471 return Response(
472 content=_base64_decodebytes(_str_encode(content, "ascii")),
473 status_code=status_code,
474 headers=headers,
475 )
478def _validate_serialized_response(serialized_response: t.Any) -> None:
479 """Validate the structure of a serialized response."""
480 if not isinstance(serialized_response, dict):
481 msg = f"Expected dict, got {type(serialized_response)}"
482 raise TypeError(msg)
483 content = serialized_response.get("content")
484 if not isinstance(content, str):
485 msg = f"Expected content to be str, got {type(content)}"
486 raise TypeError(msg)
487 status_code = serialized_response.get("status_code")
488 if not isinstance(status_code, int):
489 msg = f"Expected status_code to be int, got {type(status_code)}"
490 raise TypeError(msg)
491 headers = serialized_response.get("headers")
492 if not isinstance(headers, dict):
493 msg = f"Expected headers to be dict, got {type(headers)}"
494 raise TypeError(msg)
497async def learn_cache_key(
498 request: Request,
499 response: Response,
500 *,
501 cache: t.Any = None,
502 logger: t.Any = None,
503) -> str:
504 if cache is None or logger is None:
505 if cache is None:
506 cache = depends.get("cache")
507 if logger is None:
508 logger = depends.get("logger")
509 logger.debug(
510 f"learn_cache_key request.method={request.method!r} response.headers.Vary={response.headers.get('Vary')!r}",
511 )
512 url = request.url
513 varying_headers_cache_key = await generate_varying_headers_cache_key(url)
514 cached_vary_headers = set(await cache.get(key=varying_headers_cache_key) or ())
515 response_vary_headers = {
516 header.lower() for header in parse_http_list(response.headers.get("Vary", ""))
517 }
518 varying_headers = sorted(response_vary_headers | cached_vary_headers)
519 if varying_headers:
520 response.headers["Vary"] = ", ".join(varying_headers)
521 logger.debug(
522 f"store_varying_headers cache_key={varying_headers_cache_key!r} headers={varying_headers!r}",
523 )
524 await cache.set(key=varying_headers_cache_key, value=varying_headers)
525 cache_key = await generate_cache_key(
526 url,
527 method=request.method,
528 headers=request.headers,
529 varying_headers=varying_headers,
530 )
531 if cache_key is None:
532 msg = f"Unable to generate cache key for method {request.method}"
533 raise ValueError(msg)
534 return cache_key
537async def get_cache_key(
538 request: Request,
539 method: str,
540 cache: t.Any = None,
541 logger: t.Any = None,
542) -> str | None:
543 if cache is None or logger is None:
544 if cache is None:
545 cache = depends.get("cache")
546 if logger is None:
547 logger = depends.get("logger")
548 url = request.url
549 _safe_log(
550 logger,
551 "debug",
552 f"get_cache_key request.url={str(url)!r} method={method!r}",
553 )
554 varying_headers_cache_key = await generate_varying_headers_cache_key(url)
555 varying_headers = await cache.get(varying_headers_cache_key)
556 if varying_headers is None:
557 _safe_log(logger, "debug", "varying_headers found=False")
558 return None
559 _safe_log(
560 logger,
561 "debug",
562 f"varying_headers found=True headers={varying_headers!r}",
563 )
564 return await generate_cache_key(
565 request.url,
566 method=method,
567 headers=request.headers,
568 varying_headers=varying_headers,
569 )
572async def generate_cache_key(
573 url: URL,
574 method: str,
575 headers: Headers,
576 varying_headers: list[str],
577 config: t.Any = None,
578) -> str | None:
579 """Generate cache key using ACB's fast CRC32C hashing."""
580 if config is None:
581 config = depends.get("config")
583 if method not in cacheable_methods:
584 return None
586 # Both hash functions are now async for better performance
587 vary_hash = await _generate_vary_hash(headers, varying_headers)
588 url_hash = await _generate_url_hash(url)
590 return f"{config.app.name}:cached:{method}.{url_hash}.{vary_hash}"
593async def _generate_vary_hash(headers: Headers, varying_headers: list[str]) -> str:
594 """Generate hash for varying headers using ACB's fast CRC32C."""
595 vary_values = [
596 f"{header}:{value}"
597 for header in varying_headers
598 if (value := headers.get(header)) is not None
599 ]
601 if not vary_values:
602 return ""
604 # ACB's CRC32C is 50x faster than MD5 for cache keys (non-cryptographic)
605 result = await hash.crc32c("|".join(vary_values))
606 return str(result) # Ensure return type is str
609async def _generate_url_hash(url: URL) -> str:
610 """Generate hash for URL using ACB's fast CRC32C."""
611 # ACB's CRC32C is 50x faster than MD5 for cache keys (non-cryptographic)
612 result = await hash.crc32c(str(url))
613 return str(result) # Ensure return type is str
616async def generate_varying_headers_cache_key(url: URL) -> str:
617 """Generate cache key for varying headers using ACB's fast CRC32C."""
618 # ACB's CRC32C is 50x faster than MD5 for cache keys (non-cryptographic)
619 url_hash = await hash.crc32c(str(url.path))
620 return f"varying_headers.{url_hash}"
623def get_cache_response_headers(response: Response, *, max_age: int) -> dict[str, str]:
624 max_age = max(max_age, 0)
625 headers = {}
626 if "Expires" not in response.headers:
627 headers["Expires"] = email.utils.formatdate(time.time() + max_age, usegmt=True)
628 patch_cache_control(response.headers, max_age=max_age)
630 return headers
633def patch_cache_control(
634 headers: MutableHeaders,
635 **kwargs: t.Unpack[CacheDirectives],
636) -> None:
637 cache_control: dict[str, t.Any] = {}
638 value: t.Any
639 for field in parse_http_list(headers.get("Cache-Control", "")):
640 try:
641 key, value = field.split("=")
642 except ValueError:
643 cache_control[field] = True
644 else:
645 cache_control[key] = value
647 if "max-age" in cache_control and "max_age" in kwargs:
648 kwargs["max_age"] = min(int(cache_control["max-age"]), kwargs["max_age"])
650 # Check for unsupported directives
651 _check_unsupported_directives(kwargs)
653 for key, value in kwargs.items():
654 key = key.replace("_", "-")
655 cache_control[key] = value
657 directives: list[str] = []
658 for key, value in cache_control.items():
659 if value is False:
660 continue
661 if value is True:
662 directives.append(key)
663 else:
664 directives.append(f"{key}={value}")
666 patched_cache_control = ", ".join(directives)
667 if patched_cache_control:
668 headers["Cache-Control"] = patched_cache_control
669 else:
670 del headers["Cache-Control"]
673def _check_unsupported_directives(kwargs: t.Any) -> None:
674 """Check for unsupported cache control directives."""
675 if "public" in kwargs:
676 msg = "The 'public' cache control directive isn't supported yet."
677 raise NotImplementedError(msg)
678 if "private" in kwargs:
679 msg = "The 'private' cache control directive isn't supported yet."
680 raise NotImplementedError(msg)
683class CacheResponder:
684 def __init__(self, app: ASGIApp, *, rules: Sequence[Rule]) -> None:
685 self.app = app
686 self.rules = rules
687 try:
688 self.logger = depends.get("logger")
689 except Exception:
690 import logging
692 self.logger = logging.getLogger("fastblocks.cache")
693 try:
694 self.cache = depends.get("cache")
695 except Exception:
696 self.cache = None
697 self.initial_message: Message = {}
698 self.is_response_cacheable = True
699 self.request: Request | None = None
701 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
702 if scope["type"] != "http":
703 await self.app(scope, receive, send)
704 return
705 self.request = request = Request(scope)
706 try:
707 response = await get_from_cache(request, cache=self.cache, rules=self.rules)
708 except RequestNotCachable:
709 if request.method in invalidating_methods:
710 send = partial(self.send_then_invalidate, send=send)
711 else:
712 if response is not None:
713 _safe_log(self.logger, "debug", "cache_lookup HIT")
714 await response(scope, receive, send)
715 return
716 send = partial(self.send_with_caching, send=send)
717 _safe_log(self.logger, "debug", "cache_lookup MISS")
718 await self.app(scope, receive, send)
720 async def send_with_caching(self, message: Message, *, send: Send) -> None:
721 if not self.is_response_cacheable or message["type"] not in (
722 "http.response.start",
723 "http.response.body",
724 ):
725 await send(message)
726 return
727 if message["type"] == "http.response.start":
728 self.initial_message = message
729 return
730 if message["type"] != "http.response.body":
731 return
732 if message.get("more_body", False):
733 _safe_log(
734 self.logger,
735 "debug",
736 "response_not_cacheable reason=is_streaming",
737 )
738 self.is_response_cacheable = False
739 await send(self.initial_message)
740 await send(message)
741 return
742 if self.request is None:
743 return
744 body = message["body"]
745 response = Response(content=body, status_code=self.initial_message["status"])
746 response.raw_headers = list(self.initial_message["headers"])
747 try:
748 await set_in_cache(
749 response,
750 request=self.request,
751 cache=self.cache,
752 rules=self.rules,
753 )
754 except ResponseNotCachable:
755 self.is_response_cacheable = False
756 else:
757 self.initial_message["headers"] = response.raw_headers.copy()
758 await send(self.initial_message)
759 await send(message)
761 async def send_then_invalidate(self, message: Message, *, send: Send) -> None:
762 if self.request is None:
763 return
764 if message["type"] == "http.response.start" and 200 <= message["status"] < 400:
765 await delete_from_cache(
766 self.request.url,
767 vary=self.request.headers,
768 cache=self.cache,
769 )
770 await send(message)
773class CacheControlResponder:
774 def __init__(self, app: ASGIApp, **kwargs: t.Unpack[CacheDirectives]) -> None:
775 self.app = app
776 self.kwargs = kwargs
777 try:
778 self.logger = depends.get("logger")
779 except Exception:
780 import logging
782 self.logger = logging.getLogger("fastblocks.cache")
784 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
785 if scope["type"] != "http":
786 await self.app(scope, receive, send)
787 return
788 send = partial(self.send_with_caching, send=send)
789 await self.app(scope, receive, send)
791 @staticmethod
792 def kvformat(**kwargs: t.Any) -> str:
793 return " ".join((f"{key}={value}" for key, value in kwargs.items()))
795 async def send_with_caching(self, message: Message, *, send: Send) -> None:
796 if message["type"] == "http.response.start":
797 _safe_log(
798 self.logger,
799 "debug",
800 f"patch_cache_control {self.kvformat(**self.kwargs)}",
801 )
802 headers = MutableHeaders(raw=list(message["headers"]))
803 patch_cache_control(headers, **self.kwargs)
804 message["headers"] = headers.raw
805 await send(message)