emailsec.dkim
40async def check_dkim( 41 message: bytes, body_and_headers: emailsec.utils.BodyAndHeaders | None = None 42) -> DKIMCheck: 43 if body_and_headers: 44 body, headers = body_and_headers 45 else: 46 body, headers = body_and_headers_for_canonicalization(message) 47 48 signatures = [] 49 for header_name, raw_signature in headers.get("dkim-signature", []): 50 try: 51 sig = parse_dkim_header_field(raw_signature.decode()) 52 except ValueError: 53 continue 54 55 signatures.append(((header_name, raw_signature), sig)) 56 57 if not signatures: 58 return DKIMCheck(result=DKIMResult.PERMFAIL) 59 60 # Try to pick an aligned signature if multiple signatures are present 61 def _sort_sig(item: tuple[emailsec.utils.Header, DKIMSignature]) -> bool: 62 _, s = item 63 _, from_addr = email.utils.parseaddr(headers["from"][0][1].decode().strip()) 64 rfc5322_from = from_addr.partition("@")[-1] 65 return is_dkim_aligned(s["d"], rfc5322_from) 66 67 # Verify the top 5 signatures and stop once one verifies successfully 68 for sig_header, parsed_sig in sorted(signatures, key=_sort_sig, reverse=True)[:5]: 69 try: 70 if await _verify_dkim_signature( 71 body, headers, sig_header, typing.cast(_DKIMStyleSig, parsed_sig) 72 ): 73 return DKIMCheck( 74 result=DKIMResult.SUCCESS, 75 domain=parsed_sig["d"], 76 selector=parsed_sig["s"], 77 signature=parsed_sig, 78 ) 79 except errors.Temperror: 80 return DKIMCheck( 81 result=DKIMResult.TEMPFAIL, 82 domain=parsed_sig["d"], 83 selector=parsed_sig["s"], 84 signature=parsed_sig, 85 ) 86 except errors.Permerror: 87 continue 88 89 return DKIMCheck(result=DKIMResult.PERMFAIL)
@dataclass
class
DKIMCheck:
32@dataclass 33class DKIMCheck: 34 result: DKIMResult 35 domain: str | None = None 36 selector: str | None = None 37 signature: DKIMSignature | None = None
DKIMCheck( result: DKIMResult, domain: str | None = None, selector: str | None = None, signature: DKIMSignature | None = None)
result: DKIMResult
class
DKIMResult(enum.StrEnum):
26class DKIMResult(StrEnum): 27 SUCCESS = "SUCCESS" 28 PERMFAIL = "PERMFAIL" 29 TEMPFAIL = "TEMPFAIL"
SUCCESS =
<DKIMResult.SUCCESS: 'SUCCESS'>
PERMFAIL =
<DKIMResult.PERMFAIL: 'PERMFAIL'>
TEMPFAIL =
<DKIMResult.TEMPFAIL: 'TEMPFAIL'>
class
DKIMSignature(typing.TypedDict):
65class DKIMSignature(typing.TypedDict): 66 v: str 67 a: str 68 b: str 69 bh: str 70 c: typing.NotRequired[str] 71 d: str 72 h: str 73 i: typing.NotRequired[str] 74 l: typing.NotRequired[int] # noqa: E741 75 q: typing.NotRequired[str] 76 s: str 77 t: typing.NotRequired[int] 78 x: typing.NotRequired[int] 79 z: typing.NotRequired[str]
95def parse_dkim_header_field(data: str) -> DKIMSignature: 96 sig: DKIMSignature = {} # type: ignore 97 for result in dkim_header_field.parse_string(data, parse_all=True).as_list(): 98 field = result[0] 99 match field: 100 case "v" | "a" | "b" | "bh" | "c" | "d" | "h" | "i" | "q" | "s" | "z": 101 sig[field] = "".join(re.split(r"\s+", result[1])) 102 case "l" | "t" | "x": 103 try: 104 sig[field] = int(result[1]) 105 except ValueError as ve: 106 raise ValueError(f"Invalid field value {result=}") from ve 107 case _: 108 continue 109 110 if ( 111 missing_fields := set(sig.keys()) & _SIG_REQUIRED_FIELDS 112 ) != _SIG_REQUIRED_FIELDS: 113 raise ValueError(f"Missing required fields {missing_fields=}") 114 115 if (v := sig["v"]) != "1": 116 raise ValueError(f"Invalid version tag {v=}") 117 118 return sig