Coverage for src / git_commit_guard / __init__.py: 99%
216 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 23:09 +0300
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 23:09 +0300
1import re
2import subprocess
3import sys
4from argparse import ArgumentParser
5from dataclasses import dataclass, field
6from enum import StrEnum
7from pathlib import Path
9import nltk
10import tomllib
11from nltk.corpus import wordnet
13TYPES = frozenset(
14 {
15 "feat",
16 "fix",
17 "docs",
18 "style",
19 "refactor",
20 "perf",
21 "test",
22 "build",
23 "ci",
24 "chore",
25 "revert",
26 }
27)
29_NON_IMPERATIVE_SUFFIX_RE = re.compile(r"(?:ing|ed)$")
31SUBJECT_RE = re.compile(
32 r"^(?P<type>\w+)(?:\((?P<scope>[^)]+)\))?!?:\s+(?P<desc>.+)$",
33)
35SIGNED_OFF_RE = re.compile(
36 r"^Signed-off-by:\s+.+\s+<.+>",
37 re.MULTILINE,
38)
40MAX_SUBJECT_LEN = 72
41GIT_TIMEOUT = 10
44class Check(StrEnum):
45 SUBJECT = "subject"
46 IMPERATIVE = "imperative"
47 BODY = "body"
48 SIGNED_OFF = "signed-off"
49 SIGNATURE = "signature"
52ALL_CHECKS = frozenset(Check.__members__.values())
55def _load_config(start=None):
56 start = start or Path.cwd()
57 for directory in [start, *start.parents]:
58 config_path = directory / ".commit-guard.toml"
59 if config_path.exists():
60 with config_path.open("rb") as f:
61 return tomllib.load(f)
62 return {}
65def _parse_config_checks(config, key):
66 try:
67 return [Check(v) for v in config.get(key, [])]
68 except ValueError as e:
69 sys.exit(f".commit-guard.toml: {e}")
72class Level(StrEnum):
73 ERROR = "error"
74 WARN = "warn"
75 INFO = "info"
78PREFIXES = {
79 Level.ERROR: "\033[31m✗\033[0m",
80 Level.WARN: "\033[33m⚠\033[0m",
81 Level.INFO: "\033[34mi\033[0m",
82}
85@dataclass
86class Result:
87 errors: list = field(default_factory=list)
89 def error(self, msg):
90 self.errors.append((Level.ERROR, msg))
92 def warn(self, msg):
93 self.errors.append((Level.WARN, msg))
95 def info(self, msg):
96 self.errors.append((Level.INFO, msg))
98 @property
99 def ok(self):
100 return not any(lvl == Level.ERROR for lvl, _ in self.errors)
103def _ensure_nltk_data():
104 _download_if_missing("taggers/averaged_perceptron_tagger_eng")
105 _download_if_missing("tokenizers/punkt_tab")
106 _download_if_missing("corpora/wordnet")
109def _download_if_missing(resource):
110 try:
111 nltk.data.find(resource)
112 except LookupError:
113 nltk.download(resource.rsplit("/", maxsplit=1)[-1], quiet=True)
116def _strip_comments(message):
117 return "\n".join(
118 line for line in message.split("\n") if not line.lstrip().startswith("#")
119 )
122def check_subject(line, result, allowed_scopes=frozenset(), *, require_scope=False):
123 m = SUBJECT_RE.match(line)
124 if not m:
125 result.error(f"subject does not match 'type(scope): description': {line}")
126 return None
128 if m.group("type") not in TYPES:
129 result.error(f"unknown type: {m.group('type')}")
131 scope = m.group("scope")
132 if require_scope and scope is None:
133 result.error("scope is required")
134 if allowed_scopes and scope is not None and scope not in allowed_scopes:
135 result.error(f"unknown scope: {scope}")
137 desc = m.group("desc")
138 if desc[0].isupper():
139 result.error("description must not start with uppercase")
140 if desc.endswith("."):
141 result.error("description must not end with period")
142 if len(line) > MAX_SUBJECT_LEN:
143 result.error(f"subject too long: {len(line)} > {MAX_SUBJECT_LEN}")
144 return desc
147def check_imperative(desc, result):
148 tokens = nltk.word_tokenize(desc.lower())
149 if not tokens:
150 return
151 first = tokens[0]
152 if _NON_IMPERATIVE_SUFFIX_RE.search(first):
153 result.error(f"expected imperative verb, got '{first}' (non-imperative suffix)")
154 return
155 base = wordnet.morphy(first, wordnet.VERB)
156 if base is not None and base != first:
157 result.error(
158 f"expected imperative verb, got '{first}' (inflected form of '{base}')"
159 )
160 return
161 tagged = nltk.pos_tag(["to", *tokens])
162 if tagged[1][1] != "VB":
163 if wordnet.morphy(first, wordnet.VERB) == first:
164 return
165 result.error(
166 f"expected imperative verb, got '{tagged[1][0]}' (POS={tagged[1][1]})",
167 )
170def check_body(lines, result):
171 if len(lines) < 3: # noqa: PLR2004
172 result.error("missing body")
173 return
174 if lines[1].strip():
175 result.error("missing blank line between subject and body")
176 if not any(ln.strip() for ln in lines[2:]):
177 result.error("missing body")
180def check_signed_off(message, result):
181 if not SIGNED_OFF_RE.search(message):
182 result.error("missing 'Signed-off-by' trailer")
185def check_signature(rev, result):
186 proc = subprocess.run( # noqa: S603
187 ["git", "verify-commit", rev], # noqa: S607
188 capture_output=True,
189 text=True,
190 check=False,
191 timeout=GIT_TIMEOUT,
192 )
193 if proc.returncode != 0:
194 result.error("commit is not signed (GPG/SSH)")
195 return
197 output = proc.stderr.lower()
198 sig_type = "SSH" if "ssh" in output else "GPG"
199 result.info(f"signature type: {sig_type}")
202def _get_message(rev):
203 try:
204 return subprocess.check_output( # noqa: S603
205 ["git", "log", "-1", "--format=%B", rev], # noqa: S607
206 text=True,
207 stderr=subprocess.PIPE,
208 timeout=GIT_TIMEOUT,
209 ).strip()
210 except subprocess.CalledProcessError as e:
211 stderr = e.stderr.strip()
212 if "unknown revision" in stderr or "ambiguous argument" in stderr:
213 sys.exit("no commits yet")
214 sys.exit(f"git error: {stderr}")
217@dataclass
218class Args:
219 rev: str | None
220 message: str
221 enabled: frozenset
222 allowed_scopes: frozenset
223 require_scope: bool
226def _resolve_enabled(args, config, parser):
227 if args.enable or args.disable:
228 enabled = (
229 frozenset(_parse_checks(parser, args.enable)) if args.enable else ALL_CHECKS
230 )
231 if args.disable:
232 enabled = enabled - frozenset(_parse_checks(parser, args.disable))
233 elif config.get("enable"):
234 enabled = frozenset(_parse_config_checks(config, "enable"))
235 elif config.get("disable"):
236 enabled = ALL_CHECKS - frozenset(_parse_config_checks(config, "disable"))
237 else:
238 enabled = ALL_CHECKS
239 return enabled
242def _resolve_scopes(args, config):
243 if args.scopes:
244 allowed_scopes = frozenset(s.strip() for s in args.scopes.split(","))
245 elif config.get("scopes"):
246 allowed_scopes = frozenset(config["scopes"])
247 else:
248 allowed_scopes = frozenset()
250 if args.require_scope:
251 require_scope = True
252 elif "require-scope" in config:
253 require_scope = config["require-scope"]
254 else:
255 require_scope = False
257 return allowed_scopes, require_scope
260def _parse_checks(parser, value):
261 try:
262 return [Check(c.strip()) for c in value.split(",")]
263 except ValueError as e:
264 parser.error(str(e))
267def _parse_args():
268 checks_list = ",".join(sorted(Check))
269 parser = ArgumentParser(description="conventional commit checker")
270 parser.add_argument("rev", nargs="?", default=None)
271 parser.add_argument("--message-file", type=Path)
272 parser.add_argument(
273 "--enable",
274 metavar="CHECK[,CHECK,...]",
275 help=f"run only these checks ({checks_list})",
276 )
277 parser.add_argument(
278 "--disable",
279 metavar="CHECK[,CHECK,...]",
280 help=f"skip these checks ({checks_list})",
281 )
282 parser.add_argument(
283 "--scopes",
284 metavar="SCOPE[,SCOPE,...]",
285 help="allowed scope values (any scope accepted if not set)",
286 )
287 parser.add_argument(
288 "--require-scope",
289 action="store_true",
290 default=False,
291 help="require a scope in the subject line",
292 )
293 args = parser.parse_args()
294 config = _load_config()
295 enabled = _resolve_enabled(args, config, parser)
296 allowed_scopes, require_scope = _resolve_scopes(args, config)
298 if args.message_file:
299 rev = None
300 message = _strip_comments(args.message_file.read_text().strip())
301 elif args.rev:
302 rev = args.rev
303 message = _strip_comments(_get_message(rev))
304 elif not sys.stdin.isatty():
305 rev = None
306 message = _strip_comments(sys.stdin.read().strip())
307 else:
308 rev = "HEAD"
309 message = _strip_comments(_get_message(rev))
311 return Args(
312 rev=rev,
313 message=message,
314 enabled=enabled,
315 allowed_scopes=allowed_scopes,
316 require_scope=require_scope,
317 )
320def _report(result):
321 for level, msg in result.errors:
322 sys.stderr.write(f" {PREFIXES[level]} {msg}\n")
324 if result.ok:
325 sys.stderr.write(" \033[32m✓\033[0m all checks passed\n")
327 return 0 if result.ok else 1
330def main():
331 args = _parse_args()
332 lines = args.message.split("\n")
334 if Check.IMPERATIVE in args.enabled:
335 _ensure_nltk_data()
337 result = Result()
339 desc = None
340 if Check.SUBJECT in args.enabled:
341 desc = check_subject(
342 lines[0], result, args.allowed_scopes, require_scope=args.require_scope
343 )
344 if Check.IMPERATIVE in args.enabled:
345 if desc is None:
346 m = SUBJECT_RE.match(lines[0])
347 desc = m.group("desc") if m else None
348 if desc:
349 check_imperative(desc, result)
350 if Check.BODY in args.enabled:
351 check_body(lines, result)
352 if Check.SIGNED_OFF in args.enabled:
353 check_signed_off(args.message, result)
354 if Check.SIGNATURE in args.enabled and args.rev:
355 check_signature(args.rev, result)
357 return _report(result)