emailsec.arc

  1from dataclasses import dataclass
  2import typing
  3import enum
  4import re
  5
  6import emailsec.utils
  7from emailsec.dkim.checker import (
  8    _verify_sig,
  9    _verify_dkim_signature,
 10)
 11from emailsec.dkim.parser import (
 12    _algorithm,
 13    headers_hash,
 14    tag_lists,
 15    _DKIMStyleSig,
 16    _SigVerifier,
 17    _CanonicalizationAlg,
 18)
 19from emailsec.utils import body_and_headers_for_canonicalization
 20
 21arc_message_signature = tag_lists
 22arc_seal = tag_lists
 23
 24
 25class ARCMessageSignature(typing.TypedDict):
 26    i: int
 27    a: str
 28    b: str
 29    bh: str
 30    c: typing.NotRequired[str]
 31    d: str
 32    h: str
 33    l: typing.NotRequired[int]  # noqa: E741
 34    q: typing.NotRequired[str]
 35    s: str
 36    t: typing.NotRequired[int]
 37    x: typing.NotRequired[int]
 38    z: typing.NotRequired[str]
 39
 40
 41class ARCSeal(typing.TypedDict):
 42    i: int
 43    a: str
 44    b: str
 45    d: str
 46    s: str
 47    cv: str
 48    t: typing.NotRequired[int]
 49
 50
 51_ARC_SEAL_REQUIRED_FIELDS = {"i", "a", "b", "d", "s", "cv"}
 52_ARC_MSG_SIG_REQUIRED_FIELDS = {"i", "a", "b", "bh", "d", "h", "s"}
 53
 54
 55class ARCChainStatus(enum.StrEnum):
 56    NONE = "none"
 57    FAIL = "fail"
 58    PASS = "pass"
 59
 60
 61@dataclass
 62class ARCCheck:
 63    result: ARCChainStatus
 64    exp: str
 65    signer: str | None = None
 66    aar_header: bytes | None = None
 67
 68
 69def parse_arc_seal(data: str) -> ARCSeal:
 70    sig: ARCSeal = {}  # type: ignore
 71    for result in arc_seal.parse_string(data, parse_all=True).as_list():
 72        field = result[0]
 73        match field:
 74            case "a" | "b" | "d" | "s" | "cv":
 75                sig[field] = "".join(re.split(r"\s+", result[1]))
 76            case "t" | "i":
 77                try:
 78                    sig[field] = int(result[1])
 79                except ValueError as ve:
 80                    raise ValueError(f"Invalid field value {result=}") from ve
 81            case "h":
 82                # https://datatracker.ietf.org/doc/html/rfc8617#section-4.1.3
 83                # must fail if h tag is found in seal
 84                raise ValueError("h tag not allowed")
 85            case _:
 86                continue
 87    if (
 88        missing_fields := set(sig.keys()) & _ARC_SEAL_REQUIRED_FIELDS
 89    ) != _ARC_SEAL_REQUIRED_FIELDS:
 90        raise ValueError(f"Missing required fields {missing_fields=}")
 91
 92    return sig
 93
 94
 95def parse_arc_message_signature(data: str) -> ARCMessageSignature:
 96    sig: ARCMessageSignature = {}  # type: ignore
 97    for result in arc_message_signature.parse_string(data, parse_all=True).as_list():
 98        field = result[0]
 99        match field:
100            case "a" | "b" | "bh" | "c" | "d" | "h" | "q" | "s" | "z":
101                sig[field] = "".join(re.split(r"\s+", result[1]))
102            case "l" | "t" | "x" | "i":
103                try:
104                    sig[field] = int(result[1])
105                except ValueError as ve:
106                    raise ValueError(f"Invalid field value {result=}") from ve
107            case _:
108                continue
109
110    if missing_fields := _ARC_MSG_SIG_REQUIRED_FIELDS - set(sig.keys()):
111        raise ValueError(f"Missing required fields {missing_fields=}")
112
113    return sig
114
115
116async def arc_seal_verify(
117    arc_set_headers: tuple[
118        emailsec.utils.Header, emailsec.utils.Header, emailsec.utils.Header
119    ],
120    sig: ARCSeal,
121) -> bool:
122    header_canonicalization: _CanonicalizationAlg = "relaxed"
123    dkim_alg = _algorithm(sig["a"])
124
125    # headers ordering: aar_header, ams_header, seal_header
126    headers_to_sign = list(arc_set_headers[:2])
127    # the ARC-Seal is treated differently as the body hash needs to be stripped
128    sig_header = arc_set_headers[-1]
129    canonicalized_message = headers_hash(
130        headers_to_sign,
131        header_canonicalization,
132        sig_header,
133    )
134    return await _verify_sig(
135        dkim_alg, typing.cast(_SigVerifier, sig), canonicalized_message
136    )
137
138
139_ARC_INSTANCE = re.compile(rb"\s?i\s*=\s*(\d+)", re.MULTILINE | re.IGNORECASE)
140
141
142def _aar_instance(header_value: bytes) -> int:
143    if (match := re.search(_ARC_INSTANCE, header_value)) is not None:
144        return int(match.group(1))
145
146    raise ValueError(f"Instance not found in {header_value=}")
147
148
149async def check_arc(
150    message: bytes, body_and_headers: emailsec.utils.BodyAndHeaders | None = None
151) -> ARCCheck:
152    if body_and_headers:
153        body, headers = body_and_headers
154    else:
155        body, headers = body_and_headers_for_canonicalization(message)
156
157    arc_message_signatures = headers.get("arc-message-signature")
158    if not arc_message_signatures:
159        return ARCCheck(ARCChainStatus.NONE, "No ARC Sets")
160    arc_authentication_results = headers.get("arc-authentication-results", [])
161    arc_seals = headers.get("arc-seal", [])
162
163    if not (
164        len(arc_message_signatures) == len(arc_authentication_results) == len(arc_seals)
165    ):
166        return ARCCheck(ARCChainStatus.FAIL, "Uneven ARC Sets")
167
168    if len(arc_authentication_results) > 50:
169        return ARCCheck(ARCChainStatus.FAIL, "Too many ARC Sets")
170
171    parsed_ams = sorted(
172        (
173            (
174                parse_arc_message_signature(value.decode()),
175                (header_name, value),
176            )
177            for header_name, value in headers["arc-message-signature"]
178        ),
179        key=lambda x: x[0]["i"],
180    )
181    parsed_as = sorted(
182        (
183            (
184                parse_arc_seal(value.decode()),
185                (header_name, value),
186            )
187            for header_name, value in headers["arc-seal"]
188        ),
189        key=lambda x: x[0]["i"],
190    )
191    aars = sorted(
192        (
193            (
194                _aar_instance(value),
195                (header_name, value),
196            )
197            for header_name, value in headers["arc-authentication-results"]
198        ),
199        key=lambda x: x[0],
200    )
201
202    highest_validated_aar = None
203    highest_validated_signer = None
204
205    for instance in range(len(arc_message_signatures), 0, -1):
206        ams, ams_header = parsed_ams.pop()
207        if ams["i"] != instance:
208            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AMS for {instance=}")
209
210        seal, seal_header = parsed_as.pop()
211        if seal["i"] != instance:
212            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AS for {instance=}")
213
214        aar_instance, aar_header = aars.pop()
215        if aar_instance != instance:
216            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AAR for {instance=}")
217
218        if instance == 1 and seal["cv"] != "none":
219            return ARCCheck(ARCChainStatus.FAIL, f"AMS cv must be none for {instance=}")
220        elif instance > 1 and seal["cv"] != "pass":
221            return ARCCheck(ARCChainStatus.FAIL, f"AMS cv fail for {instance=}")
222
223        is_ams_valid = await _verify_dkim_signature(
224            body, headers, ams_header, typing.cast(_DKIMStyleSig, ams)
225        )
226        if not is_ams_valid:
227            return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AMS for {instance=}")
228
229        arc_set_headers = (aar_header, ams_header, seal_header)
230
231        is_seal_valid = await arc_seal_verify(arc_set_headers, seal)
232        if not is_seal_valid:
233            return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AS for {instance=}")
234
235        if highest_validated_aar is None:
236            highest_validated_aar = aar_header[1]
237            highest_validated_signer = seal["d"]
238
239    return ARCCheck(
240        ARCChainStatus.PASS,
241        "",
242        signer=highest_validated_signer,
243        aar_header=highest_validated_aar,
244    )
arc_message_signature = {{Group:({{{{{{Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...]) W:(A-Za-z, 0-9A-Z_a-z)} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} Suppress:('=')} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} [Combine:({{W:( -:<-~) | {[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...}} | <SP><TAB>}) | {W:(+/-9A-Za-z) [{[[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...] W:(+/-9A-Za-z)}]...}]} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])}) [{{Suppress:(';') Group:({{{{{{Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...]) W:(A-Za-z, 0-9A-Z_a-z)} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} Suppress:('=')} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} [Combine:({{W:( -:<-~) | {[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...}} | <SP><TAB>}) | {W:(+/-9A-Za-z) [{[[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...] W:(+/-9A-Za-z)}]...}]} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])})}}...]} Suppress:([';'])}
arc_seal = {{Group:({{{{{{Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...]) W:(A-Za-z, 0-9A-Z_a-z)} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} Suppress:('=')} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} [Combine:({{W:( -:<-~) | {[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...}} | <SP><TAB>}) | {W:(+/-9A-Za-z) [{[[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...] W:(+/-9A-Za-z)}]...}]} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])}) [{{Suppress:(';') Group:({{{{{{Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...]) W:(A-Za-z, 0-9A-Z_a-z)} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} Suppress:('=')} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])} [Combine:({{W:( -:<-~) | {[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...}} | <SP><TAB>}) | {W:(+/-9A-Za-z) [{[[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...] W:(+/-9A-Za-z)}]...}]} Suppress:([[[<SP><TAB>]... {<CR> <LF>}] {<SP><TAB>}...])})}}...]} Suppress:([';'])}
class ARCMessageSignature(typing.TypedDict):
26class ARCMessageSignature(typing.TypedDict):
27    i: int
28    a: str
29    b: str
30    bh: str
31    c: typing.NotRequired[str]
32    d: str
33    h: str
34    l: typing.NotRequired[int]  # noqa: E741
35    q: typing.NotRequired[str]
36    s: str
37    t: typing.NotRequired[int]
38    x: typing.NotRequired[int]
39    z: typing.NotRequired[str]
i: int
a: str
b: str
bh: str
c: NotRequired[str]
d: str
h: str
l: NotRequired[int]
q: NotRequired[str]
s: str
t: NotRequired[int]
x: NotRequired[int]
z: NotRequired[str]
class ARCSeal(typing.TypedDict):
42class ARCSeal(typing.TypedDict):
43    i: int
44    a: str
45    b: str
46    d: str
47    s: str
48    cv: str
49    t: typing.NotRequired[int]
i: int
a: str
b: str
d: str
s: str
cv: str
t: NotRequired[int]
class ARCChainStatus(enum.StrEnum):
56class ARCChainStatus(enum.StrEnum):
57    NONE = "none"
58    FAIL = "fail"
59    PASS = "pass"
NONE = <ARCChainStatus.NONE: 'none'>
FAIL = <ARCChainStatus.FAIL: 'fail'>
PASS = <ARCChainStatus.PASS: 'pass'>
@dataclass
class ARCCheck:
62@dataclass
63class ARCCheck:
64    result: ARCChainStatus
65    exp: str
66    signer: str | None = None
67    aar_header: bytes | None = None
ARCCheck( result: ARCChainStatus, exp: str, signer: str | None = None, aar_header: bytes | None = None)
result: ARCChainStatus
exp: str
signer: str | None = None
aar_header: bytes | None = None
def parse_arc_seal(data: str) -> ARCSeal:
70def parse_arc_seal(data: str) -> ARCSeal:
71    sig: ARCSeal = {}  # type: ignore
72    for result in arc_seal.parse_string(data, parse_all=True).as_list():
73        field = result[0]
74        match field:
75            case "a" | "b" | "d" | "s" | "cv":
76                sig[field] = "".join(re.split(r"\s+", result[1]))
77            case "t" | "i":
78                try:
79                    sig[field] = int(result[1])
80                except ValueError as ve:
81                    raise ValueError(f"Invalid field value {result=}") from ve
82            case "h":
83                # https://datatracker.ietf.org/doc/html/rfc8617#section-4.1.3
84                # must fail if h tag is found in seal
85                raise ValueError("h tag not allowed")
86            case _:
87                continue
88    if (
89        missing_fields := set(sig.keys()) & _ARC_SEAL_REQUIRED_FIELDS
90    ) != _ARC_SEAL_REQUIRED_FIELDS:
91        raise ValueError(f"Missing required fields {missing_fields=}")
92
93    return sig
def parse_arc_message_signature(data: str) -> ARCMessageSignature:
 96def parse_arc_message_signature(data: str) -> ARCMessageSignature:
 97    sig: ARCMessageSignature = {}  # type: ignore
 98    for result in arc_message_signature.parse_string(data, parse_all=True).as_list():
 99        field = result[0]
100        match field:
101            case "a" | "b" | "bh" | "c" | "d" | "h" | "q" | "s" | "z":
102                sig[field] = "".join(re.split(r"\s+", result[1]))
103            case "l" | "t" | "x" | "i":
104                try:
105                    sig[field] = int(result[1])
106                except ValueError as ve:
107                    raise ValueError(f"Invalid field value {result=}") from ve
108            case _:
109                continue
110
111    if missing_fields := _ARC_MSG_SIG_REQUIRED_FIELDS - set(sig.keys()):
112        raise ValueError(f"Missing required fields {missing_fields=}")
113
114    return sig
async def arc_seal_verify( arc_set_headers: tuple[tuple[bytes, bytes], tuple[bytes, bytes], tuple[bytes, bytes]], sig: ARCSeal) -> bool:
117async def arc_seal_verify(
118    arc_set_headers: tuple[
119        emailsec.utils.Header, emailsec.utils.Header, emailsec.utils.Header
120    ],
121    sig: ARCSeal,
122) -> bool:
123    header_canonicalization: _CanonicalizationAlg = "relaxed"
124    dkim_alg = _algorithm(sig["a"])
125
126    # headers ordering: aar_header, ams_header, seal_header
127    headers_to_sign = list(arc_set_headers[:2])
128    # the ARC-Seal is treated differently as the body hash needs to be stripped
129    sig_header = arc_set_headers[-1]
130    canonicalized_message = headers_hash(
131        headers_to_sign,
132        header_canonicalization,
133        sig_header,
134    )
135    return await _verify_sig(
136        dkim_alg, typing.cast(_SigVerifier, sig), canonicalized_message
137    )
async def check_arc( message: bytes, body_and_headers: BodyAndHeaders | None = None) -> ARCCheck:
150async def check_arc(
151    message: bytes, body_and_headers: emailsec.utils.BodyAndHeaders | None = None
152) -> ARCCheck:
153    if body_and_headers:
154        body, headers = body_and_headers
155    else:
156        body, headers = body_and_headers_for_canonicalization(message)
157
158    arc_message_signatures = headers.get("arc-message-signature")
159    if not arc_message_signatures:
160        return ARCCheck(ARCChainStatus.NONE, "No ARC Sets")
161    arc_authentication_results = headers.get("arc-authentication-results", [])
162    arc_seals = headers.get("arc-seal", [])
163
164    if not (
165        len(arc_message_signatures) == len(arc_authentication_results) == len(arc_seals)
166    ):
167        return ARCCheck(ARCChainStatus.FAIL, "Uneven ARC Sets")
168
169    if len(arc_authentication_results) > 50:
170        return ARCCheck(ARCChainStatus.FAIL, "Too many ARC Sets")
171
172    parsed_ams = sorted(
173        (
174            (
175                parse_arc_message_signature(value.decode()),
176                (header_name, value),
177            )
178            for header_name, value in headers["arc-message-signature"]
179        ),
180        key=lambda x: x[0]["i"],
181    )
182    parsed_as = sorted(
183        (
184            (
185                parse_arc_seal(value.decode()),
186                (header_name, value),
187            )
188            for header_name, value in headers["arc-seal"]
189        ),
190        key=lambda x: x[0]["i"],
191    )
192    aars = sorted(
193        (
194            (
195                _aar_instance(value),
196                (header_name, value),
197            )
198            for header_name, value in headers["arc-authentication-results"]
199        ),
200        key=lambda x: x[0],
201    )
202
203    highest_validated_aar = None
204    highest_validated_signer = None
205
206    for instance in range(len(arc_message_signatures), 0, -1):
207        ams, ams_header = parsed_ams.pop()
208        if ams["i"] != instance:
209            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AMS for {instance=}")
210
211        seal, seal_header = parsed_as.pop()
212        if seal["i"] != instance:
213            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AS for {instance=}")
214
215        aar_instance, aar_header = aars.pop()
216        if aar_instance != instance:
217            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AAR for {instance=}")
218
219        if instance == 1 and seal["cv"] != "none":
220            return ARCCheck(ARCChainStatus.FAIL, f"AMS cv must be none for {instance=}")
221        elif instance > 1 and seal["cv"] != "pass":
222            return ARCCheck(ARCChainStatus.FAIL, f"AMS cv fail for {instance=}")
223
224        is_ams_valid = await _verify_dkim_signature(
225            body, headers, ams_header, typing.cast(_DKIMStyleSig, ams)
226        )
227        if not is_ams_valid:
228            return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AMS for {instance=}")
229
230        arc_set_headers = (aar_header, ams_header, seal_header)
231
232        is_seal_valid = await arc_seal_verify(arc_set_headers, seal)
233        if not is_seal_valid:
234            return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AS for {instance=}")
235
236        if highest_validated_aar is None:
237            highest_validated_aar = aar_header[1]
238            highest_validated_signer = seal["d"]
239
240    return ARCCheck(
241        ARCChainStatus.PASS,
242        "",
243        signer=highest_validated_signer,
244        aar_header=highest_validated_aar,
245    )