emailsec.dmarc
1from enum import StrEnum 2import typing 3from dataclasses import dataclass 4 5import publicsuffixlist 6from emailsec.dns_resolver import DNSResolver 7from emailsec import errors 8 9if typing.TYPE_CHECKING: 10 from emailsec.spf.checker import SPFCheck 11 from emailsec.dkim.checker import DKIMCheck 12 from emailsec.arc import ARCCheck 13 14AlignmentMode = typing.Literal["relaxed", "strict"] 15 16 17class DMARCPolicy(StrEnum): 18 NONE = "none" 19 QUARANTINE = "quarantine" 20 REJECT = "reject" 21 22 23@dataclass 24class DMARCRecord: 25 policy: DMARCPolicy 26 spf_mode: AlignmentMode 27 dkim_mode: AlignmentMode 28 # percentage: int 29 30 31@dataclass 32class DMARCResult: 33 result: str # pass, fail, none 34 policy: DMARCPolicy 35 spf_aligned: bool | None = None 36 dkim_aligned: bool | None = None 37 arc_override_applied: bool = False 38 39 40async def get_dmarc_policy(domain: str) -> DMARCRecord | None: 41 """Fetch DMARC policy according to RFC 7489 Section 6.1. 42 43 Note that the spec does not include a temp error result, it will instead `errors.TempErrror`. 44 """ 45 resolver = DNSResolver() 46 try: 47 txt_records = await resolver.txt(f"_dmarc.{domain}") 48 except errors.Permerror: 49 pass 50 except errors.Temperror: 51 raise 52 else: 53 if txt_records: 54 try: 55 return parse_dmarc_record(txt_records[0].text) 56 except Exception: 57 return None 58 59 # RFC 7419 Section 6.6.3: "If the set is now empty, the Mail Receiver MUST query the DNS for 60 # a DMARC TXT record at the DNS domain matching the Organizational 61 # Domain in place of the RFC5322.From domain in the message (if 62 # different). 63 psl = publicsuffixlist.PublicSuffixList() 64 organizational_domain = psl.privatesuffix(domain.lower()) or domain 65 if organizational_domain != domain: 66 try: 67 txt_records = await resolver.txt(f"_dmarc.{organizational_domain}") 68 except errors.Permerror: 69 pass 70 except errors.Temperror: 71 raise 72 else: 73 if txt_records: 74 try: 75 return parse_dmarc_record(txt_records[0].text) 76 except Exception: 77 return None 78 79 return None 80 81 82def parse_dmarc_record(record: str) -> DMARCRecord: 83 tags = {} 84 for part in record.split(";"): 85 part = part.strip() 86 if "=" in part: 87 key, value = part.split("=", 1) 88 tags[key.strip()] = value.strip() 89 90 if "v" not in tags: 91 raise ValueError("Missing mandatory v=DMARC1 tag") 92 93 if tags["v"] != "DMARC1": 94 raise ValueError(f"Invalid DMARC version: {tags['v']}, expected DMARC1") 95 96 if "p" not in tags: 97 raise ValueError("Missing mandatory p= tag") 98 99 return DMARCRecord( 100 policy=DMARCPolicy(tags.get("p", "none")), 101 spf_mode="strict" if tags.get("aspf") == "s" else "relaxed", 102 dkim_mode="strict" if tags.get("adkim") == "s" else "relaxed", 103 # percentage=int(tags.get('pct', '100')) 104 ) 105 106 107def is_spf_aligned( 108 rfc5312_mail_from: str, 109 rfc5322_from: str, 110 mode: AlignmentMode = "relaxed", 111) -> bool: 112 match mode: 113 case "strict": 114 return rfc5312_mail_from.lower() == rfc5322_from.lower() 115 case "relaxed": 116 psl = publicsuffixlist.PublicSuffixList() 117 return psl.privatesuffix(rfc5312_mail_from) == psl.privatesuffix( 118 rfc5322_from 119 ) 120 121 122def is_dkim_aligned( 123 dkim_domain: str, 124 rfc5322_from: str, 125 mode: AlignmentMode = "relaxed", 126) -> bool: 127 match mode: 128 case "strict": 129 return dkim_domain.lower() == rfc5322_from.lower() 130 case "relaxed": 131 psl = publicsuffixlist.PublicSuffixList() 132 return psl.privatesuffix(dkim_domain) == psl.privatesuffix(rfc5322_from) 133 134 135async def check_dmarc( 136 header_from: str, 137 envelope_from: str, 138 spf_check: "SPFCheck", 139 dkim_check: "DKIMCheck", 140 arc_check: "ARCCheck", 141 configuration: typing.Any = None, 142) -> DMARCResult: 143 """ 144 DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment). 145 146 RFC 7489: "A message satisfies the DMARC checks if at least one of the supported 147 authentication mechanisms: 1. produces a 'pass' result, and 2. produces that 148 result based on an identifier that is in alignment" 149 """ 150 # Import here to avoid circular imports 151 from emailsec.spf.checker import SPFResult 152 from emailsec.dkim.checker import DKIMResult 153 from emailsec.arc import ARCChainStatus 154 from emailsec.authentication_results import extract_original_auth_results 155 156 # Get DMARC policy (RFC 7489 Section 6.1) 157 dmarc_policy = await get_dmarc_policy(header_from) 158 if not dmarc_policy: 159 return DMARCResult(result="none", policy=DMARCPolicy.NONE) 160 161 # Check identifier alignment (RFC 7489 Section 3.1) 162 # SPF alignment: envelope sender domain vs header from domain 163 spf_aligned = spf_check.result == SPFResult.PASS and is_spf_aligned( 164 envelope_from, header_from, dmarc_policy.spf_mode 165 ) 166 167 # DKIM alignment: signing domain (d=) vs header from domain 168 dkim_aligned = bool( 169 dkim_check.result == DKIMResult.SUCCESS 170 and dkim_check.domain 171 and is_dkim_aligned(dkim_check.domain, header_from, dmarc_policy.dkim_mode) 172 ) 173 174 # RFC 7489: DMARC passes if either SPF or DKIM is aligned and passes 175 dmarc_pass = spf_aligned or dkim_aligned 176 177 # ARC override logic (RFC 8617 Section 7.2.1) 178 # RFC 8617: "a DMARC processor MAY choose to accept the authentication 179 # assessments provided by an Authenticated Received Chain" 180 arc_override_applied = False 181 if ( 182 not dmarc_pass 183 and configuration 184 and hasattr(configuration, "trusted_signers") 185 and configuration.trusted_signers 186 and arc_check.signer in configuration.trusted_signers 187 and arc_check.result == ARCChainStatus.PASS 188 and arc_check.aar_header 189 ): 190 parsed_aar = extract_original_auth_results( 191 arc_check.result, arc_check.aar_header 192 ) 193 if parsed_aar and "dmarc" in parsed_aar and parsed_aar["dmarc"] == "pass": 194 dmarc_pass = True 195 arc_override_applied = True 196 197 return DMARCResult( 198 result="pass" if dmarc_pass else "fail", 199 policy=dmarc_policy.policy, 200 spf_aligned=spf_aligned, 201 dkim_aligned=dkim_aligned, 202 arc_override_applied=arc_override_applied, 203 )
AlignmentMode =
typing.Literal['relaxed', 'strict']
class
DMARCPolicy(enum.StrEnum):
NONE =
<DMARCPolicy.NONE: 'none'>
QUARANTINE =
<DMARCPolicy.QUARANTINE: 'quarantine'>
REJECT =
<DMARCPolicy.REJECT: 'reject'>
@dataclass
class
DMARCRecord:
24@dataclass 25class DMARCRecord: 26 policy: DMARCPolicy 27 spf_mode: AlignmentMode 28 dkim_mode: AlignmentMode 29 # percentage: int
DMARCRecord( policy: DMARCPolicy, spf_mode: Literal['relaxed', 'strict'], dkim_mode: Literal['relaxed', 'strict'])
policy: DMARCPolicy
@dataclass
class
DMARCResult:
32@dataclass 33class DMARCResult: 34 result: str # pass, fail, none 35 policy: DMARCPolicy 36 spf_aligned: bool | None = None 37 dkim_aligned: bool | None = None 38 arc_override_applied: bool = False
DMARCResult( result: str, policy: DMARCPolicy, spf_aligned: bool | None = None, dkim_aligned: bool | None = None, arc_override_applied: bool = False)
policy: DMARCPolicy
41async def get_dmarc_policy(domain: str) -> DMARCRecord | None: 42 """Fetch DMARC policy according to RFC 7489 Section 6.1. 43 44 Note that the spec does not include a temp error result, it will instead `errors.TempErrror`. 45 """ 46 resolver = DNSResolver() 47 try: 48 txt_records = await resolver.txt(f"_dmarc.{domain}") 49 except errors.Permerror: 50 pass 51 except errors.Temperror: 52 raise 53 else: 54 if txt_records: 55 try: 56 return parse_dmarc_record(txt_records[0].text) 57 except Exception: 58 return None 59 60 # RFC 7419 Section 6.6.3: "If the set is now empty, the Mail Receiver MUST query the DNS for 61 # a DMARC TXT record at the DNS domain matching the Organizational 62 # Domain in place of the RFC5322.From domain in the message (if 63 # different). 64 psl = publicsuffixlist.PublicSuffixList() 65 organizational_domain = psl.privatesuffix(domain.lower()) or domain 66 if organizational_domain != domain: 67 try: 68 txt_records = await resolver.txt(f"_dmarc.{organizational_domain}") 69 except errors.Permerror: 70 pass 71 except errors.Temperror: 72 raise 73 else: 74 if txt_records: 75 try: 76 return parse_dmarc_record(txt_records[0].text) 77 except Exception: 78 return None 79 80 return None
Fetch DMARC policy according to RFC 7489 Section 6.1.
Note that the spec does not include a temp error result, it will instead errors.TempErrror.
83def parse_dmarc_record(record: str) -> DMARCRecord: 84 tags = {} 85 for part in record.split(";"): 86 part = part.strip() 87 if "=" in part: 88 key, value = part.split("=", 1) 89 tags[key.strip()] = value.strip() 90 91 if "v" not in tags: 92 raise ValueError("Missing mandatory v=DMARC1 tag") 93 94 if tags["v"] != "DMARC1": 95 raise ValueError(f"Invalid DMARC version: {tags['v']}, expected DMARC1") 96 97 if "p" not in tags: 98 raise ValueError("Missing mandatory p= tag") 99 100 return DMARCRecord( 101 policy=DMARCPolicy(tags.get("p", "none")), 102 spf_mode="strict" if tags.get("aspf") == "s" else "relaxed", 103 dkim_mode="strict" if tags.get("adkim") == "s" else "relaxed", 104 # percentage=int(tags.get('pct', '100')) 105 )
def
is_spf_aligned( rfc5312_mail_from: str, rfc5322_from: str, mode: Literal['relaxed', 'strict'] = 'relaxed') -> bool:
108def is_spf_aligned( 109 rfc5312_mail_from: str, 110 rfc5322_from: str, 111 mode: AlignmentMode = "relaxed", 112) -> bool: 113 match mode: 114 case "strict": 115 return rfc5312_mail_from.lower() == rfc5322_from.lower() 116 case "relaxed": 117 psl = publicsuffixlist.PublicSuffixList() 118 return psl.privatesuffix(rfc5312_mail_from) == psl.privatesuffix( 119 rfc5322_from 120 )
def
is_dkim_aligned( dkim_domain: str, rfc5322_from: str, mode: Literal['relaxed', 'strict'] = 'relaxed') -> bool:
123def is_dkim_aligned( 124 dkim_domain: str, 125 rfc5322_from: str, 126 mode: AlignmentMode = "relaxed", 127) -> bool: 128 match mode: 129 case "strict": 130 return dkim_domain.lower() == rfc5322_from.lower() 131 case "relaxed": 132 psl = publicsuffixlist.PublicSuffixList() 133 return psl.privatesuffix(dkim_domain) == psl.privatesuffix(rfc5322_from)
async def
check_dmarc( header_from: str, envelope_from: str, spf_check: emailsec.spf.SPFCheck, dkim_check: emailsec.dkim.DKIMCheck, arc_check: emailsec.arc.ARCCheck, configuration: Any = None) -> DMARCResult:
136async def check_dmarc( 137 header_from: str, 138 envelope_from: str, 139 spf_check: "SPFCheck", 140 dkim_check: "DKIMCheck", 141 arc_check: "ARCCheck", 142 configuration: typing.Any = None, 143) -> DMARCResult: 144 """ 145 DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment). 146 147 RFC 7489: "A message satisfies the DMARC checks if at least one of the supported 148 authentication mechanisms: 1. produces a 'pass' result, and 2. produces that 149 result based on an identifier that is in alignment" 150 """ 151 # Import here to avoid circular imports 152 from emailsec.spf.checker import SPFResult 153 from emailsec.dkim.checker import DKIMResult 154 from emailsec.arc import ARCChainStatus 155 from emailsec.authentication_results import extract_original_auth_results 156 157 # Get DMARC policy (RFC 7489 Section 6.1) 158 dmarc_policy = await get_dmarc_policy(header_from) 159 if not dmarc_policy: 160 return DMARCResult(result="none", policy=DMARCPolicy.NONE) 161 162 # Check identifier alignment (RFC 7489 Section 3.1) 163 # SPF alignment: envelope sender domain vs header from domain 164 spf_aligned = spf_check.result == SPFResult.PASS and is_spf_aligned( 165 envelope_from, header_from, dmarc_policy.spf_mode 166 ) 167 168 # DKIM alignment: signing domain (d=) vs header from domain 169 dkim_aligned = bool( 170 dkim_check.result == DKIMResult.SUCCESS 171 and dkim_check.domain 172 and is_dkim_aligned(dkim_check.domain, header_from, dmarc_policy.dkim_mode) 173 ) 174 175 # RFC 7489: DMARC passes if either SPF or DKIM is aligned and passes 176 dmarc_pass = spf_aligned or dkim_aligned 177 178 # ARC override logic (RFC 8617 Section 7.2.1) 179 # RFC 8617: "a DMARC processor MAY choose to accept the authentication 180 # assessments provided by an Authenticated Received Chain" 181 arc_override_applied = False 182 if ( 183 not dmarc_pass 184 and configuration 185 and hasattr(configuration, "trusted_signers") 186 and configuration.trusted_signers 187 and arc_check.signer in configuration.trusted_signers 188 and arc_check.result == ARCChainStatus.PASS 189 and arc_check.aar_header 190 ): 191 parsed_aar = extract_original_auth_results( 192 arc_check.result, arc_check.aar_header 193 ) 194 if parsed_aar and "dmarc" in parsed_aar and parsed_aar["dmarc"] == "pass": 195 dmarc_pass = True 196 arc_override_applied = True 197 198 return DMARCResult( 199 result="pass" if dmarc_pass else "fail", 200 policy=dmarc_policy.policy, 201 spf_aligned=spf_aligned, 202 dkim_aligned=dkim_aligned, 203 arc_override_applied=arc_override_applied, 204 )
DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment).
RFC 7489: "A message satisfies the DMARC checks if at least one of the supported authentication mechanisms: 1. produces a 'pass' result, and 2. produces that result based on an identifier that is in alignment"