emailsec.dkim

 1from .checker import check_dkim, DKIMCheck, DKIMResult
 2from .parser import DKIMSignature, parse_dkim_header_field
 3
 4__all__ = [
 5    "check_dkim",
 6    "DKIMCheck",
 7    "DKIMResult",
 8    "DKIMSignature",
 9    "parse_dkim_header_field",
10]
async def check_dkim( message: bytes, body_and_headers: BodyAndHeaders | None = None) -> DKIMCheck:
40async def check_dkim(
41    message: bytes, body_and_headers: emailsec.utils.BodyAndHeaders | None = None
42) -> DKIMCheck:
43    if body_and_headers:
44        body, headers = body_and_headers
45    else:
46        body, headers = body_and_headers_for_canonicalization(message)
47
48    signatures = []
49    for header_name, raw_signature in headers.get("dkim-signature", []):
50        try:
51            sig = parse_dkim_header_field(raw_signature.decode())
52        except ValueError:
53            continue
54
55        signatures.append(((header_name, raw_signature), sig))
56
57    if not signatures:
58        return DKIMCheck(result=DKIMResult.PERMFAIL)
59
60    # Try to pick an aligned signature if multiple signatures are present
61    def _sort_sig(item: tuple[emailsec.utils.Header, DKIMSignature]) -> bool:
62        _, s = item
63        _, from_addr = email.utils.parseaddr(headers["from"][0][1].decode().strip())
64        rfc5322_from = from_addr.partition("@")[-1]
65        return is_dkim_aligned(s["d"], rfc5322_from)
66
67    # Verify the top 5 signatures and stop once one verifies successfully
68    for sig_header, parsed_sig in sorted(signatures, key=_sort_sig, reverse=True)[:5]:
69        try:
70            if await _verify_dkim_signature(
71                body, headers, sig_header, typing.cast(_DKIMStyleSig, parsed_sig)
72            ):
73                return DKIMCheck(
74                    result=DKIMResult.SUCCESS,
75                    domain=parsed_sig["d"],
76                    selector=parsed_sig["s"],
77                    signature=parsed_sig,
78                )
79        except errors.Temperror:
80            return DKIMCheck(
81                result=DKIMResult.TEMPFAIL,
82                domain=parsed_sig["d"],
83                selector=parsed_sig["s"],
84                signature=parsed_sig,
85            )
86        except errors.Permerror:
87            continue
88
89    return DKIMCheck(result=DKIMResult.PERMFAIL)
@dataclass
class DKIMCheck:
32@dataclass
33class DKIMCheck:
34    result: DKIMResult
35    domain: str | None = None
36    selector: str | None = None
37    signature: DKIMSignature | None = None
DKIMCheck( result: DKIMResult, domain: str | None = None, selector: str | None = None, signature: DKIMSignature | None = None)
result: DKIMResult
domain: str | None = None
selector: str | None = None
signature: DKIMSignature | None = None
class DKIMResult(enum.StrEnum):
26class DKIMResult(StrEnum):
27    SUCCESS = "SUCCESS"
28    PERMFAIL = "PERMFAIL"
29    TEMPFAIL = "TEMPFAIL"
SUCCESS = <DKIMResult.SUCCESS: 'SUCCESS'>
PERMFAIL = <DKIMResult.PERMFAIL: 'PERMFAIL'>
TEMPFAIL = <DKIMResult.TEMPFAIL: 'TEMPFAIL'>
class DKIMSignature(typing.TypedDict):
65class DKIMSignature(typing.TypedDict):
66    v: str
67    a: str
68    b: str
69    bh: str
70    c: typing.NotRequired[str]
71    d: str
72    h: str
73    i: typing.NotRequired[str]
74    l: typing.NotRequired[int]  # noqa: E741
75    q: typing.NotRequired[str]
76    s: str
77    t: typing.NotRequired[int]
78    x: typing.NotRequired[int]
79    z: typing.NotRequired[str]
v: str
a: str
b: str
bh: str
c: NotRequired[str]
d: str
h: str
i: NotRequired[str]
l: NotRequired[int]
q: NotRequired[str]
s: str
t: NotRequired[int]
x: NotRequired[int]
z: NotRequired[str]
def parse_dkim_header_field(data: str) -> DKIMSignature:
 95def parse_dkim_header_field(data: str) -> DKIMSignature:
 96    sig: DKIMSignature = {}  # type: ignore
 97    for result in dkim_header_field.parse_string(data, parse_all=True).as_list():
 98        field = result[0]
 99        match field:
100            case "v" | "a" | "b" | "bh" | "c" | "d" | "h" | "i" | "q" | "s" | "z":
101                sig[field] = "".join(re.split(r"\s+", result[1]))
102            case "l" | "t" | "x":
103                try:
104                    sig[field] = int(result[1])
105                except ValueError as ve:
106                    raise ValueError(f"Invalid field value {result=}") from ve
107            case _:
108                continue
109
110    if (
111        missing_fields := set(sig.keys()) & _SIG_REQUIRED_FIELDS
112    ) != _SIG_REQUIRED_FIELDS:
113        raise ValueError(f"Missing required fields {missing_fields=}")
114
115    if (v := sig["v"]) != "1":
116        raise ValueError(f"Invalid version tag {v=}")
117
118    return sig