emailsec.dmarc

  1from enum import StrEnum
  2import typing
  3from dataclasses import dataclass
  4
  5import publicsuffixlist
  6from emailsec.dns_resolver import DNSResolver
  7from emailsec import errors
  8
  9if typing.TYPE_CHECKING:
 10    from emailsec.spf.checker import SPFCheck
 11    from emailsec.dkim.checker import DKIMCheck
 12    from emailsec.arc import ARCCheck
 13
 14AlignmentMode = typing.Literal["relaxed", "strict"]
 15
 16
 17class DMARCPolicy(StrEnum):
 18    NONE = "none"
 19    QUARANTINE = "quarantine"
 20    REJECT = "reject"
 21
 22
 23@dataclass
 24class DMARCRecord:
 25    policy: DMARCPolicy
 26    spf_mode: AlignmentMode
 27    dkim_mode: AlignmentMode
 28    # percentage: int
 29
 30
 31@dataclass
 32class DMARCResult:
 33    result: str  # pass, fail, none
 34    policy: DMARCPolicy
 35    spf_aligned: bool | None = None
 36    dkim_aligned: bool | None = None
 37    arc_override_applied: bool = False
 38
 39
 40async def get_dmarc_policy(domain: str) -> DMARCRecord | None:
 41    """Fetch DMARC policy according to RFC 7489 Section 6.1.
 42
 43    Note that the spec does not include a temp error result, it will instead `errors.TempErrror`.
 44    """
 45    resolver = DNSResolver()
 46    try:
 47        txt_records = await resolver.txt(f"_dmarc.{domain}")
 48    except errors.Permerror:
 49        pass
 50    except errors.Temperror:
 51        raise
 52    else:
 53        if txt_records:
 54            try:
 55                return parse_dmarc_record(txt_records[0].text)
 56            except Exception:
 57                return None
 58
 59    # RFC 7419 Section 6.6.3: "If the set is now empty, the Mail Receiver MUST query the DNS for
 60    # a DMARC TXT record at the DNS domain matching the Organizational
 61    # Domain in place of the RFC5322.From domain in the message (if
 62    # different).
 63    psl = publicsuffixlist.PublicSuffixList()
 64    organizational_domain = psl.privatesuffix(domain.lower()) or domain
 65    if organizational_domain != domain:
 66        try:
 67            txt_records = await resolver.txt(f"_dmarc.{organizational_domain}")
 68        except errors.Permerror:
 69            pass
 70        except errors.Temperror:
 71            raise
 72        else:
 73            if txt_records:
 74                try:
 75                    return parse_dmarc_record(txt_records[0].text)
 76                except Exception:
 77                    return None
 78
 79    return None
 80
 81
 82def parse_dmarc_record(record: str) -> DMARCRecord:
 83    tags = {}
 84    for part in record.split(";"):
 85        part = part.strip()
 86        if "=" in part:
 87            key, value = part.split("=", 1)
 88            tags[key.strip()] = value.strip()
 89
 90    if "v" not in tags:
 91        raise ValueError("Missing mandatory v=DMARC1 tag")
 92
 93    if tags["v"] != "DMARC1":
 94        raise ValueError(f"Invalid DMARC version: {tags['v']}, expected DMARC1")
 95
 96    if "p" not in tags:
 97        raise ValueError("Missing mandatory p= tag")
 98
 99    return DMARCRecord(
100        policy=DMARCPolicy(tags.get("p", "none")),
101        spf_mode="strict" if tags.get("aspf") == "s" else "relaxed",
102        dkim_mode="strict" if tags.get("adkim") == "s" else "relaxed",
103        # percentage=int(tags.get('pct', '100'))
104    )
105
106
107def is_spf_aligned(
108    rfc5312_mail_from: str,
109    rfc5322_from: str,
110    mode: AlignmentMode = "relaxed",
111) -> bool:
112    match mode:
113        case "strict":
114            return rfc5312_mail_from.lower() == rfc5322_from.lower()
115        case "relaxed":
116            psl = publicsuffixlist.PublicSuffixList()
117            return psl.privatesuffix(rfc5312_mail_from) == psl.privatesuffix(
118                rfc5322_from
119            )
120
121
122def is_dkim_aligned(
123    dkim_domain: str,
124    rfc5322_from: str,
125    mode: AlignmentMode = "relaxed",
126) -> bool:
127    match mode:
128        case "strict":
129            return dkim_domain.lower() == rfc5322_from.lower()
130        case "relaxed":
131            psl = publicsuffixlist.PublicSuffixList()
132            return psl.privatesuffix(dkim_domain) == psl.privatesuffix(rfc5322_from)
133
134
135async def check_dmarc(
136    header_from: str,
137    envelope_from: str,
138    spf_check: "SPFCheck",
139    dkim_check: "DKIMCheck",
140    arc_check: "ARCCheck",
141    configuration: typing.Any = None,
142) -> DMARCResult:
143    """
144    DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment).
145
146    RFC 7489: "A message satisfies the DMARC checks if at least one of the supported
147    authentication mechanisms: 1. produces a 'pass' result, and 2. produces that
148    result based on an identifier that is in alignment"
149    """
150    # Import here to avoid circular imports
151    from emailsec.spf.checker import SPFResult
152    from emailsec.dkim.checker import DKIMResult
153    from emailsec.arc import ARCChainStatus
154    from emailsec.authentication_results import extract_original_auth_results
155
156    # Get DMARC policy (RFC 7489 Section 6.1)
157    dmarc_policy = await get_dmarc_policy(header_from)
158    if not dmarc_policy:
159        return DMARCResult(result="none", policy=DMARCPolicy.NONE)
160
161    # Check identifier alignment (RFC 7489 Section 3.1)
162    # SPF alignment: envelope sender domain vs header from domain
163    spf_aligned = spf_check.result == SPFResult.PASS and is_spf_aligned(
164        envelope_from, header_from, dmarc_policy.spf_mode
165    )
166
167    # DKIM alignment: signing domain (d=) vs header from domain
168    dkim_aligned = bool(
169        dkim_check.result == DKIMResult.SUCCESS
170        and dkim_check.domain
171        and is_dkim_aligned(dkim_check.domain, header_from, dmarc_policy.dkim_mode)
172    )
173
174    # RFC 7489: DMARC passes if either SPF or DKIM is aligned and passes
175    dmarc_pass = spf_aligned or dkim_aligned
176
177    # ARC override logic (RFC 8617 Section 7.2.1)
178    # RFC 8617: "a DMARC processor MAY choose to accept the authentication
179    # assessments provided by an Authenticated Received Chain"
180    arc_override_applied = False
181    if (
182        not dmarc_pass
183        and configuration
184        and hasattr(configuration, "trusted_signers")
185        and configuration.trusted_signers
186        and arc_check.signer in configuration.trusted_signers
187        and arc_check.result == ARCChainStatus.PASS
188        and arc_check.aar_header
189    ):
190        parsed_aar = extract_original_auth_results(
191            arc_check.result, arc_check.aar_header
192        )
193        if parsed_aar and "dmarc" in parsed_aar and parsed_aar["dmarc"] == "pass":
194            dmarc_pass = True
195            arc_override_applied = True
196
197    return DMARCResult(
198        result="pass" if dmarc_pass else "fail",
199        policy=dmarc_policy.policy,
200        spf_aligned=spf_aligned,
201        dkim_aligned=dkim_aligned,
202        arc_override_applied=arc_override_applied,
203    )
AlignmentMode = typing.Literal['relaxed', 'strict']
class DMARCPolicy(enum.StrEnum):
18class DMARCPolicy(StrEnum):
19    NONE = "none"
20    QUARANTINE = "quarantine"
21    REJECT = "reject"
NONE = <DMARCPolicy.NONE: 'none'>
QUARANTINE = <DMARCPolicy.QUARANTINE: 'quarantine'>
REJECT = <DMARCPolicy.REJECT: 'reject'>
@dataclass
class DMARCRecord:
24@dataclass
25class DMARCRecord:
26    policy: DMARCPolicy
27    spf_mode: AlignmentMode
28    dkim_mode: AlignmentMode
29    # percentage: int
DMARCRecord( policy: DMARCPolicy, spf_mode: Literal['relaxed', 'strict'], dkim_mode: Literal['relaxed', 'strict'])
policy: DMARCPolicy
spf_mode: Literal['relaxed', 'strict']
dkim_mode: Literal['relaxed', 'strict']
@dataclass
class DMARCResult:
32@dataclass
33class DMARCResult:
34    result: str  # pass, fail, none
35    policy: DMARCPolicy
36    spf_aligned: bool | None = None
37    dkim_aligned: bool | None = None
38    arc_override_applied: bool = False
DMARCResult( result: str, policy: DMARCPolicy, spf_aligned: bool | None = None, dkim_aligned: bool | None = None, arc_override_applied: bool = False)
result: str
policy: DMARCPolicy
spf_aligned: bool | None = None
dkim_aligned: bool | None = None
arc_override_applied: bool = False
async def get_dmarc_policy(domain: str) -> DMARCRecord | None:
41async def get_dmarc_policy(domain: str) -> DMARCRecord | None:
42    """Fetch DMARC policy according to RFC 7489 Section 6.1.
43
44    Note that the spec does not include a temp error result, it will instead `errors.TempErrror`.
45    """
46    resolver = DNSResolver()
47    try:
48        txt_records = await resolver.txt(f"_dmarc.{domain}")
49    except errors.Permerror:
50        pass
51    except errors.Temperror:
52        raise
53    else:
54        if txt_records:
55            try:
56                return parse_dmarc_record(txt_records[0].text)
57            except Exception:
58                return None
59
60    # RFC 7419 Section 6.6.3: "If the set is now empty, the Mail Receiver MUST query the DNS for
61    # a DMARC TXT record at the DNS domain matching the Organizational
62    # Domain in place of the RFC5322.From domain in the message (if
63    # different).
64    psl = publicsuffixlist.PublicSuffixList()
65    organizational_domain = psl.privatesuffix(domain.lower()) or domain
66    if organizational_domain != domain:
67        try:
68            txt_records = await resolver.txt(f"_dmarc.{organizational_domain}")
69        except errors.Permerror:
70            pass
71        except errors.Temperror:
72            raise
73        else:
74            if txt_records:
75                try:
76                    return parse_dmarc_record(txt_records[0].text)
77                except Exception:
78                    return None
79
80    return None

Fetch DMARC policy according to RFC 7489 Section 6.1.

Note that the spec does not include a temp error result, it will instead errors.TempErrror.

def parse_dmarc_record(record: str) -> DMARCRecord:
 83def parse_dmarc_record(record: str) -> DMARCRecord:
 84    tags = {}
 85    for part in record.split(";"):
 86        part = part.strip()
 87        if "=" in part:
 88            key, value = part.split("=", 1)
 89            tags[key.strip()] = value.strip()
 90
 91    if "v" not in tags:
 92        raise ValueError("Missing mandatory v=DMARC1 tag")
 93
 94    if tags["v"] != "DMARC1":
 95        raise ValueError(f"Invalid DMARC version: {tags['v']}, expected DMARC1")
 96
 97    if "p" not in tags:
 98        raise ValueError("Missing mandatory p= tag")
 99
100    return DMARCRecord(
101        policy=DMARCPolicy(tags.get("p", "none")),
102        spf_mode="strict" if tags.get("aspf") == "s" else "relaxed",
103        dkim_mode="strict" if tags.get("adkim") == "s" else "relaxed",
104        # percentage=int(tags.get('pct', '100'))
105    )
def is_spf_aligned( rfc5312_mail_from: str, rfc5322_from: str, mode: Literal['relaxed', 'strict'] = 'relaxed') -> bool:
108def is_spf_aligned(
109    rfc5312_mail_from: str,
110    rfc5322_from: str,
111    mode: AlignmentMode = "relaxed",
112) -> bool:
113    match mode:
114        case "strict":
115            return rfc5312_mail_from.lower() == rfc5322_from.lower()
116        case "relaxed":
117            psl = publicsuffixlist.PublicSuffixList()
118            return psl.privatesuffix(rfc5312_mail_from) == psl.privatesuffix(
119                rfc5322_from
120            )
def is_dkim_aligned( dkim_domain: str, rfc5322_from: str, mode: Literal['relaxed', 'strict'] = 'relaxed') -> bool:
123def is_dkim_aligned(
124    dkim_domain: str,
125    rfc5322_from: str,
126    mode: AlignmentMode = "relaxed",
127) -> bool:
128    match mode:
129        case "strict":
130            return dkim_domain.lower() == rfc5322_from.lower()
131        case "relaxed":
132            psl = publicsuffixlist.PublicSuffixList()
133            return psl.privatesuffix(dkim_domain) == psl.privatesuffix(rfc5322_from)
async def check_dmarc( header_from: str, envelope_from: str, spf_check: emailsec.spf.SPFCheck, dkim_check: emailsec.dkim.DKIMCheck, arc_check: emailsec.arc.ARCCheck, configuration: Any = None) -> DMARCResult:
136async def check_dmarc(
137    header_from: str,
138    envelope_from: str,
139    spf_check: "SPFCheck",
140    dkim_check: "DKIMCheck",
141    arc_check: "ARCCheck",
142    configuration: typing.Any = None,
143) -> DMARCResult:
144    """
145    DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment).
146
147    RFC 7489: "A message satisfies the DMARC checks if at least one of the supported
148    authentication mechanisms: 1. produces a 'pass' result, and 2. produces that
149    result based on an identifier that is in alignment"
150    """
151    # Import here to avoid circular imports
152    from emailsec.spf.checker import SPFResult
153    from emailsec.dkim.checker import DKIMResult
154    from emailsec.arc import ARCChainStatus
155    from emailsec.authentication_results import extract_original_auth_results
156
157    # Get DMARC policy (RFC 7489 Section 6.1)
158    dmarc_policy = await get_dmarc_policy(header_from)
159    if not dmarc_policy:
160        return DMARCResult(result="none", policy=DMARCPolicy.NONE)
161
162    # Check identifier alignment (RFC 7489 Section 3.1)
163    # SPF alignment: envelope sender domain vs header from domain
164    spf_aligned = spf_check.result == SPFResult.PASS and is_spf_aligned(
165        envelope_from, header_from, dmarc_policy.spf_mode
166    )
167
168    # DKIM alignment: signing domain (d=) vs header from domain
169    dkim_aligned = bool(
170        dkim_check.result == DKIMResult.SUCCESS
171        and dkim_check.domain
172        and is_dkim_aligned(dkim_check.domain, header_from, dmarc_policy.dkim_mode)
173    )
174
175    # RFC 7489: DMARC passes if either SPF or DKIM is aligned and passes
176    dmarc_pass = spf_aligned or dkim_aligned
177
178    # ARC override logic (RFC 8617 Section 7.2.1)
179    # RFC 8617: "a DMARC processor MAY choose to accept the authentication
180    # assessments provided by an Authenticated Received Chain"
181    arc_override_applied = False
182    if (
183        not dmarc_pass
184        and configuration
185        and hasattr(configuration, "trusted_signers")
186        and configuration.trusted_signers
187        and arc_check.signer in configuration.trusted_signers
188        and arc_check.result == ARCChainStatus.PASS
189        and arc_check.aar_header
190    ):
191        parsed_aar = extract_original_auth_results(
192            arc_check.result, arc_check.aar_header
193        )
194        if parsed_aar and "dmarc" in parsed_aar and parsed_aar["dmarc"] == "pass":
195            dmarc_pass = True
196            arc_override_applied = True
197
198    return DMARCResult(
199        result="pass" if dmarc_pass else "fail",
200        policy=dmarc_policy.policy,
201        spf_aligned=spf_aligned,
202        dkim_aligned=dkim_aligned,
203        arc_override_applied=arc_override_applied,
204    )

DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment).

RFC 7489: "A message satisfies the DMARC checks if at least one of the supported authentication mechanisms: 1. produces a 'pass' result, and 2. produces that result based on an identifier that is in alignment"