emailsec.arc
1from dataclasses import dataclass 2import typing 3import enum 4import re 5 6import emailsec.utils 7from emailsec.dkim.checker import ( 8 _verify_sig, 9 _verify_dkim_signature, 10) 11from emailsec.dkim.parser import ( 12 _algorithm, 13 headers_hash, 14 tag_lists, 15 _DKIMStyleSig, 16 _SigVerifier, 17 _CanonicalizationAlg, 18) 19from emailsec.utils import body_and_headers_for_canonicalization 20 21arc_message_signature = tag_lists 22arc_seal = tag_lists 23 24 25class ARCMessageSignature(typing.TypedDict): 26 i: int 27 a: str 28 b: str 29 bh: str 30 c: typing.NotRequired[str] 31 d: str 32 h: str 33 l: typing.NotRequired[int] # noqa: E741 34 q: typing.NotRequired[str] 35 s: str 36 t: typing.NotRequired[int] 37 x: typing.NotRequired[int] 38 z: typing.NotRequired[str] 39 40 41class ARCSeal(typing.TypedDict): 42 i: int 43 a: str 44 b: str 45 d: str 46 s: str 47 cv: str 48 t: typing.NotRequired[int] 49 50 51_ARC_SEAL_REQUIRED_FIELDS = {"i", "a", "b", "d", "s", "cv"} 52_ARC_MSG_SIG_REQUIRED_FIELDS = {"i", "a", "b", "bh", "d", "h", "s"} 53 54 55class ARCChainStatus(enum.StrEnum): 56 NONE = "none" 57 FAIL = "fail" 58 PASS = "pass" 59 60 61@dataclass 62class ARCCheck: 63 result: ARCChainStatus 64 exp: str 65 signer: str | None = None 66 aar_header: bytes | None = None 67 68 69def parse_arc_seal(data: str) -> ARCSeal: 70 sig: ARCSeal = {} # type: ignore 71 for result in arc_seal.parse_string(data, parse_all=True).as_list(): 72 field = result[0] 73 match field: 74 case "a" | "b" | "d" | "s" | "cv": 75 sig[field] = "".join(re.split(r"\s+", result[1])) 76 case "t" | "i": 77 try: 78 sig[field] = int(result[1]) 79 except ValueError as ve: 80 raise ValueError(f"Invalid field value {result=}") from ve 81 case "h": 82 # https://datatracker.ietf.org/doc/html/rfc8617#section-4.1.3 83 # must fail if h tag is found in seal 84 raise ValueError("h tag not allowed") 85 case _: 86 continue 87 if ( 88 missing_fields := set(sig.keys()) & _ARC_SEAL_REQUIRED_FIELDS 89 ) != _ARC_SEAL_REQUIRED_FIELDS: 90 raise ValueError(f"Missing required fields {missing_fields=}") 91 92 return sig 93 94 95def parse_arc_message_signature(data: str) -> ARCMessageSignature: 96 sig: ARCMessageSignature = {} # type: ignore 97 for result in arc_message_signature.parse_string(data, parse_all=True).as_list(): 98 field = result[0] 99 match field: 100 case "a" | "b" | "bh" | "c" | "d" | "h" | "q" | "s" | "z": 101 sig[field] = "".join(re.split(r"\s+", result[1])) 102 case "l" | "t" | "x" | "i": 103 try: 104 sig[field] = int(result[1]) 105 except ValueError as ve: 106 raise ValueError(f"Invalid field value {result=}") from ve 107 case _: 108 continue 109 110 if missing_fields := _ARC_MSG_SIG_REQUIRED_FIELDS - set(sig.keys()): 111 raise ValueError(f"Missing required fields {missing_fields=}") 112 113 return sig 114 115 116async def arc_seal_verify( 117 arc_set_headers: tuple[ 118 emailsec.utils.Header, emailsec.utils.Header, emailsec.utils.Header 119 ], 120 sig: ARCSeal, 121) -> bool: 122 header_canonicalization: _CanonicalizationAlg = "relaxed" 123 dkim_alg = _algorithm(sig["a"]) 124 125 # headers ordering: aar_header, ams_header, seal_header 126 headers_to_sign = list(arc_set_headers[:2]) 127 # the ARC-Seal is treated differently as the body hash needs to be stripped 128 sig_header = arc_set_headers[-1] 129 canonicalized_message = headers_hash( 130 headers_to_sign, 131 header_canonicalization, 132 sig_header, 133 ) 134 return await _verify_sig( 135 dkim_alg, typing.cast(_SigVerifier, sig), canonicalized_message 136 ) 137 138 139_ARC_INSTANCE = re.compile(rb"\s?i\s*=\s*(\d+)", re.MULTILINE | re.IGNORECASE) 140 141 142def _aar_instance(header_value: bytes) -> int: 143 if (match := re.search(_ARC_INSTANCE, header_value)) is not None: 144 return int(match.group(1)) 145 146 raise ValueError(f"Instance not found in {header_value=}") 147 148 149async def check_arc( 150 message: bytes, body_and_headers: emailsec.utils.BodyAndHeaders | None = None 151) -> ARCCheck: 152 if body_and_headers: 153 body, headers = body_and_headers 154 else: 155 body, headers = body_and_headers_for_canonicalization(message) 156 157 arc_message_signatures = headers.get("arc-message-signature") 158 if not arc_message_signatures: 159 return ARCCheck(ARCChainStatus.NONE, "No ARC Sets") 160 arc_authentication_results = headers.get("arc-authentication-results", []) 161 arc_seals = headers.get("arc-seal", []) 162 163 if not ( 164 len(arc_message_signatures) == len(arc_authentication_results) == len(arc_seals) 165 ): 166 return ARCCheck(ARCChainStatus.FAIL, "Uneven ARC Sets") 167 168 if len(arc_authentication_results) > 50: 169 return ARCCheck(ARCChainStatus.FAIL, "Too many ARC Sets") 170 171 parsed_ams = sorted( 172 ( 173 ( 174 parse_arc_message_signature(value.decode()), 175 (header_name, value), 176 ) 177 for header_name, value in headers["arc-message-signature"] 178 ), 179 key=lambda x: x[0]["i"], 180 ) 181 parsed_as = sorted( 182 ( 183 ( 184 parse_arc_seal(value.decode()), 185 (header_name, value), 186 ) 187 for header_name, value in headers["arc-seal"] 188 ), 189 key=lambda x: x[0]["i"], 190 ) 191 aars = sorted( 192 ( 193 ( 194 _aar_instance(value), 195 (header_name, value), 196 ) 197 for header_name, value in headers["arc-authentication-results"] 198 ), 199 key=lambda x: x[0], 200 ) 201 202 highest_validated_aar = None 203 highest_validated_signer = None 204 205 for instance in range(len(arc_message_signatures), 0, -1): 206 ams, ams_header = parsed_ams.pop() 207 if ams["i"] != instance: 208 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AMS for {instance=}") 209 210 seal, seal_header = parsed_as.pop() 211 if seal["i"] != instance: 212 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AS for {instance=}") 213 214 aar_instance, aar_header = aars.pop() 215 if aar_instance != instance: 216 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AAR for {instance=}") 217 218 if instance == 1 and seal["cv"] != "none": 219 return ARCCheck(ARCChainStatus.FAIL, f"AMS cv must be none for {instance=}") 220 elif instance > 1 and seal["cv"] != "pass": 221 return ARCCheck(ARCChainStatus.FAIL, f"AMS cv fail for {instance=}") 222 223 is_ams_valid = await _verify_dkim_signature( 224 body, headers, ams_header, typing.cast(_DKIMStyleSig, ams) 225 ) 226 if not is_ams_valid: 227 return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AMS for {instance=}") 228 229 arc_set_headers = (aar_header, ams_header, seal_header) 230 231 is_seal_valid = await arc_seal_verify(arc_set_headers, seal) 232 if not is_seal_valid: 233 return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AS for {instance=}") 234 235 if highest_validated_aar is None: 236 highest_validated_aar = aar_header[1] 237 highest_validated_signer = seal["d"] 238 239 return ARCCheck( 240 ARCChainStatus.PASS, 241 "", 242 signer=highest_validated_signer, 243 aar_header=highest_validated_aar, 244 )
arc_message_signature =
{{Group:({{{{{{Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...]) W:(A-Za-z, 0-9A-Z_a-z)} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} Suppress:('=')} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} [Combine:({{W:(
-:<-~) | {[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...}} | <SP><TAB>}) | {W:(+/-9A-Za-z) [{[[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...] W:(+/-9A-Za-z)}]...}]} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])}) [{{Suppress:(';') Group:({{{{{{Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...]) W:(A-Za-z, 0-9A-Z_a-z)} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} Suppress:('=')} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} [Combine:({{W:(
-:<-~) | {[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...}} | <SP><TAB>}) | {W:(+/-9A-Za-z) [{[[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...] W:(+/-9A-Za-z)}]...}]} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])})}}...]} Suppress:([';'])}
arc_seal =
{{Group:({{{{{{Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...]) W:(A-Za-z, 0-9A-Z_a-z)} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} Suppress:('=')} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} [Combine:({{W:(
-:<-~) | {[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...}} | <SP><TAB>}) | {W:(+/-9A-Za-z) [{[[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...] W:(+/-9A-Za-z)}]...}]} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])}) [{{Suppress:(';') Group:({{{{{{Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...]) W:(A-Za-z, 0-9A-Z_a-z)} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} Suppress:('=')} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} [Combine:({{W:(
-:<-~) | {[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...}} | <SP><TAB>}) | {W:(+/-9A-Za-z) [{[[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...] W:(+/-9A-Za-z)}]...}]} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])})}}...]} Suppress:([';'])}
class
ARCMessageSignature(typing.TypedDict):
26class ARCMessageSignature(typing.TypedDict): 27 i: int 28 a: str 29 b: str 30 bh: str 31 c: typing.NotRequired[str] 32 d: str 33 h: str 34 l: typing.NotRequired[int] # noqa: E741 35 q: typing.NotRequired[str] 36 s: str 37 t: typing.NotRequired[int] 38 x: typing.NotRequired[int] 39 z: typing.NotRequired[str]
class
ARCSeal(typing.TypedDict):
class
ARCChainStatus(enum.StrEnum):
NONE =
<ARCChainStatus.NONE: 'none'>
FAIL =
<ARCChainStatus.FAIL: 'fail'>
PASS =
<ARCChainStatus.PASS: 'pass'>
@dataclass
class
ARCCheck:
62@dataclass 63class ARCCheck: 64 result: ARCChainStatus 65 exp: str 66 signer: str | None = None 67 aar_header: bytes | None = None
ARCCheck( result: ARCChainStatus, exp: str, signer: str | None = None, aar_header: bytes | None = None)
result: ARCChainStatus
70def parse_arc_seal(data: str) -> ARCSeal: 71 sig: ARCSeal = {} # type: ignore 72 for result in arc_seal.parse_string(data, parse_all=True).as_list(): 73 field = result[0] 74 match field: 75 case "a" | "b" | "d" | "s" | "cv": 76 sig[field] = "".join(re.split(r"\s+", result[1])) 77 case "t" | "i": 78 try: 79 sig[field] = int(result[1]) 80 except ValueError as ve: 81 raise ValueError(f"Invalid field value {result=}") from ve 82 case "h": 83 # https://datatracker.ietf.org/doc/html/rfc8617#section-4.1.3 84 # must fail if h tag is found in seal 85 raise ValueError("h tag not allowed") 86 case _: 87 continue 88 if ( 89 missing_fields := set(sig.keys()) & _ARC_SEAL_REQUIRED_FIELDS 90 ) != _ARC_SEAL_REQUIRED_FIELDS: 91 raise ValueError(f"Missing required fields {missing_fields=}") 92 93 return sig
96def parse_arc_message_signature(data: str) -> ARCMessageSignature: 97 sig: ARCMessageSignature = {} # type: ignore 98 for result in arc_message_signature.parse_string(data, parse_all=True).as_list(): 99 field = result[0] 100 match field: 101 case "a" | "b" | "bh" | "c" | "d" | "h" | "q" | "s" | "z": 102 sig[field] = "".join(re.split(r"\s+", result[1])) 103 case "l" | "t" | "x" | "i": 104 try: 105 sig[field] = int(result[1]) 106 except ValueError as ve: 107 raise ValueError(f"Invalid field value {result=}") from ve 108 case _: 109 continue 110 111 if missing_fields := _ARC_MSG_SIG_REQUIRED_FIELDS - set(sig.keys()): 112 raise ValueError(f"Missing required fields {missing_fields=}") 113 114 return sig
async def
arc_seal_verify( arc_set_headers: tuple[tuple[bytes, bytes], tuple[bytes, bytes], tuple[bytes, bytes]], sig: ARCSeal) -> bool:
117async def arc_seal_verify( 118 arc_set_headers: tuple[ 119 emailsec.utils.Header, emailsec.utils.Header, emailsec.utils.Header 120 ], 121 sig: ARCSeal, 122) -> bool: 123 header_canonicalization: _CanonicalizationAlg = "relaxed" 124 dkim_alg = _algorithm(sig["a"]) 125 126 # headers ordering: aar_header, ams_header, seal_header 127 headers_to_sign = list(arc_set_headers[:2]) 128 # the ARC-Seal is treated differently as the body hash needs to be stripped 129 sig_header = arc_set_headers[-1] 130 canonicalized_message = headers_hash( 131 headers_to_sign, 132 header_canonicalization, 133 sig_header, 134 ) 135 return await _verify_sig( 136 dkim_alg, typing.cast(_SigVerifier, sig), canonicalized_message 137 )
150async def check_arc( 151 message: bytes, body_and_headers: emailsec.utils.BodyAndHeaders | None = None 152) -> ARCCheck: 153 if body_and_headers: 154 body, headers = body_and_headers 155 else: 156 body, headers = body_and_headers_for_canonicalization(message) 157 158 arc_message_signatures = headers.get("arc-message-signature") 159 if not arc_message_signatures: 160 return ARCCheck(ARCChainStatus.NONE, "No ARC Sets") 161 arc_authentication_results = headers.get("arc-authentication-results", []) 162 arc_seals = headers.get("arc-seal", []) 163 164 if not ( 165 len(arc_message_signatures) == len(arc_authentication_results) == len(arc_seals) 166 ): 167 return ARCCheck(ARCChainStatus.FAIL, "Uneven ARC Sets") 168 169 if len(arc_authentication_results) > 50: 170 return ARCCheck(ARCChainStatus.FAIL, "Too many ARC Sets") 171 172 parsed_ams = sorted( 173 ( 174 ( 175 parse_arc_message_signature(value.decode()), 176 (header_name, value), 177 ) 178 for header_name, value in headers["arc-message-signature"] 179 ), 180 key=lambda x: x[0]["i"], 181 ) 182 parsed_as = sorted( 183 ( 184 ( 185 parse_arc_seal(value.decode()), 186 (header_name, value), 187 ) 188 for header_name, value in headers["arc-seal"] 189 ), 190 key=lambda x: x[0]["i"], 191 ) 192 aars = sorted( 193 ( 194 ( 195 _aar_instance(value), 196 (header_name, value), 197 ) 198 for header_name, value in headers["arc-authentication-results"] 199 ), 200 key=lambda x: x[0], 201 ) 202 203 highest_validated_aar = None 204 highest_validated_signer = None 205 206 for instance in range(len(arc_message_signatures), 0, -1): 207 ams, ams_header = parsed_ams.pop() 208 if ams["i"] != instance: 209 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AMS for {instance=}") 210 211 seal, seal_header = parsed_as.pop() 212 if seal["i"] != instance: 213 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AS for {instance=}") 214 215 aar_instance, aar_header = aars.pop() 216 if aar_instance != instance: 217 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AAR for {instance=}") 218 219 if instance == 1 and seal["cv"] != "none": 220 return ARCCheck(ARCChainStatus.FAIL, f"AMS cv must be none for {instance=}") 221 elif instance > 1 and seal["cv"] != "pass": 222 return ARCCheck(ARCChainStatus.FAIL, f"AMS cv fail for {instance=}") 223 224 is_ams_valid = await _verify_dkim_signature( 225 body, headers, ams_header, typing.cast(_DKIMStyleSig, ams) 226 ) 227 if not is_ams_valid: 228 return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AMS for {instance=}") 229 230 arc_set_headers = (aar_header, ams_header, seal_header) 231 232 is_seal_valid = await arc_seal_verify(arc_set_headers, seal) 233 if not is_seal_valid: 234 return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AS for {instance=}") 235 236 if highest_validated_aar is None: 237 highest_validated_aar = aar_header[1] 238 highest_validated_signer = seal["d"] 239 240 return ARCCheck( 241 ARCChainStatus.PASS, 242 "", 243 signer=highest_validated_signer, 244 aar_header=highest_validated_aar, 245 )