Coverage for src / git_commit_guard / __init__.py: 99%

216 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 23:09 +0300

1import re 

2import subprocess 

3import sys 

4from argparse import ArgumentParser 

5from dataclasses import dataclass, field 

6from enum import StrEnum 

7from pathlib import Path 

8 

9import nltk 

10import tomllib 

11from nltk.corpus import wordnet 

12 

13TYPES = frozenset( 

14 { 

15 "feat", 

16 "fix", 

17 "docs", 

18 "style", 

19 "refactor", 

20 "perf", 

21 "test", 

22 "build", 

23 "ci", 

24 "chore", 

25 "revert", 

26 } 

27) 

28 

29_NON_IMPERATIVE_SUFFIX_RE = re.compile(r"(?:ing|ed)$") 

30 

31SUBJECT_RE = re.compile( 

32 r"^(?P<type>\w+)(?:\((?P<scope>[^)]+)\))?!?:\s+(?P<desc>.+)$", 

33) 

34 

35SIGNED_OFF_RE = re.compile( 

36 r"^Signed-off-by:\s+.+\s+<.+>", 

37 re.MULTILINE, 

38) 

39 

40MAX_SUBJECT_LEN = 72 

41GIT_TIMEOUT = 10 

42 

43 

44class Check(StrEnum): 

45 SUBJECT = "subject" 

46 IMPERATIVE = "imperative" 

47 BODY = "body" 

48 SIGNED_OFF = "signed-off" 

49 SIGNATURE = "signature" 

50 

51 

52ALL_CHECKS = frozenset(Check.__members__.values()) 

53 

54 

55def _load_config(start=None): 

56 start = start or Path.cwd() 

57 for directory in [start, *start.parents]: 

58 config_path = directory / ".commit-guard.toml" 

59 if config_path.exists(): 

60 with config_path.open("rb") as f: 

61 return tomllib.load(f) 

62 return {} 

63 

64 

65def _parse_config_checks(config, key): 

66 try: 

67 return [Check(v) for v in config.get(key, [])] 

68 except ValueError as e: 

69 sys.exit(f".commit-guard.toml: {e}") 

70 

71 

72class Level(StrEnum): 

73 ERROR = "error" 

74 WARN = "warn" 

75 INFO = "info" 

76 

77 

78PREFIXES = { 

79 Level.ERROR: "\033[31m✗\033[0m", 

80 Level.WARN: "\033[33m⚠\033[0m", 

81 Level.INFO: "\033[34mi\033[0m", 

82} 

83 

84 

85@dataclass 

86class Result: 

87 errors: list = field(default_factory=list) 

88 

89 def error(self, msg): 

90 self.errors.append((Level.ERROR, msg)) 

91 

92 def warn(self, msg): 

93 self.errors.append((Level.WARN, msg)) 

94 

95 def info(self, msg): 

96 self.errors.append((Level.INFO, msg)) 

97 

98 @property 

99 def ok(self): 

100 return not any(lvl == Level.ERROR for lvl, _ in self.errors) 

101 

102 

103def _ensure_nltk_data(): 

104 _download_if_missing("taggers/averaged_perceptron_tagger_eng") 

105 _download_if_missing("tokenizers/punkt_tab") 

106 _download_if_missing("corpora/wordnet") 

107 

108 

109def _download_if_missing(resource): 

110 try: 

111 nltk.data.find(resource) 

112 except LookupError: 

113 nltk.download(resource.rsplit("/", maxsplit=1)[-1], quiet=True) 

114 

115 

116def _strip_comments(message): 

117 return "\n".join( 

118 line for line in message.split("\n") if not line.lstrip().startswith("#") 

119 ) 

120 

121 

122def check_subject(line, result, allowed_scopes=frozenset(), *, require_scope=False): 

123 m = SUBJECT_RE.match(line) 

124 if not m: 

125 result.error(f"subject does not match 'type(scope): description': {line}") 

126 return None 

127 

128 if m.group("type") not in TYPES: 

129 result.error(f"unknown type: {m.group('type')}") 

130 

131 scope = m.group("scope") 

132 if require_scope and scope is None: 

133 result.error("scope is required") 

134 if allowed_scopes and scope is not None and scope not in allowed_scopes: 

135 result.error(f"unknown scope: {scope}") 

136 

137 desc = m.group("desc") 

138 if desc[0].isupper(): 

139 result.error("description must not start with uppercase") 

140 if desc.endswith("."): 

141 result.error("description must not end with period") 

142 if len(line) > MAX_SUBJECT_LEN: 

143 result.error(f"subject too long: {len(line)} > {MAX_SUBJECT_LEN}") 

144 return desc 

145 

146 

147def check_imperative(desc, result): 

148 tokens = nltk.word_tokenize(desc.lower()) 

149 if not tokens: 

150 return 

151 first = tokens[0] 

152 if _NON_IMPERATIVE_SUFFIX_RE.search(first): 

153 result.error(f"expected imperative verb, got '{first}' (non-imperative suffix)") 

154 return 

155 base = wordnet.morphy(first, wordnet.VERB) 

156 if base is not None and base != first: 

157 result.error( 

158 f"expected imperative verb, got '{first}' (inflected form of '{base}')" 

159 ) 

160 return 

161 tagged = nltk.pos_tag(["to", *tokens]) 

162 if tagged[1][1] != "VB": 

163 if wordnet.morphy(first, wordnet.VERB) == first: 

164 return 

165 result.error( 

166 f"expected imperative verb, got '{tagged[1][0]}' (POS={tagged[1][1]})", 

167 ) 

168 

169 

170def check_body(lines, result): 

171 if len(lines) < 3: # noqa: PLR2004 

172 result.error("missing body") 

173 return 

174 if lines[1].strip(): 

175 result.error("missing blank line between subject and body") 

176 if not any(ln.strip() for ln in lines[2:]): 

177 result.error("missing body") 

178 

179 

180def check_signed_off(message, result): 

181 if not SIGNED_OFF_RE.search(message): 

182 result.error("missing 'Signed-off-by' trailer") 

183 

184 

185def check_signature(rev, result): 

186 proc = subprocess.run( # noqa: S603 

187 ["git", "verify-commit", rev], # noqa: S607 

188 capture_output=True, 

189 text=True, 

190 check=False, 

191 timeout=GIT_TIMEOUT, 

192 ) 

193 if proc.returncode != 0: 

194 result.error("commit is not signed (GPG/SSH)") 

195 return 

196 

197 output = proc.stderr.lower() 

198 sig_type = "SSH" if "ssh" in output else "GPG" 

199 result.info(f"signature type: {sig_type}") 

200 

201 

202def _get_message(rev): 

203 try: 

204 return subprocess.check_output( # noqa: S603 

205 ["git", "log", "-1", "--format=%B", rev], # noqa: S607 

206 text=True, 

207 stderr=subprocess.PIPE, 

208 timeout=GIT_TIMEOUT, 

209 ).strip() 

210 except subprocess.CalledProcessError as e: 

211 stderr = e.stderr.strip() 

212 if "unknown revision" in stderr or "ambiguous argument" in stderr: 

213 sys.exit("no commits yet") 

214 sys.exit(f"git error: {stderr}") 

215 

216 

217@dataclass 

218class Args: 

219 rev: str | None 

220 message: str 

221 enabled: frozenset 

222 allowed_scopes: frozenset 

223 require_scope: bool 

224 

225 

226def _resolve_enabled(args, config, parser): 

227 if args.enable or args.disable: 

228 enabled = ( 

229 frozenset(_parse_checks(parser, args.enable)) if args.enable else ALL_CHECKS 

230 ) 

231 if args.disable: 

232 enabled = enabled - frozenset(_parse_checks(parser, args.disable)) 

233 elif config.get("enable"): 

234 enabled = frozenset(_parse_config_checks(config, "enable")) 

235 elif config.get("disable"): 

236 enabled = ALL_CHECKS - frozenset(_parse_config_checks(config, "disable")) 

237 else: 

238 enabled = ALL_CHECKS 

239 return enabled 

240 

241 

242def _resolve_scopes(args, config): 

243 if args.scopes: 

244 allowed_scopes = frozenset(s.strip() for s in args.scopes.split(",")) 

245 elif config.get("scopes"): 

246 allowed_scopes = frozenset(config["scopes"]) 

247 else: 

248 allowed_scopes = frozenset() 

249 

250 if args.require_scope: 

251 require_scope = True 

252 elif "require-scope" in config: 

253 require_scope = config["require-scope"] 

254 else: 

255 require_scope = False 

256 

257 return allowed_scopes, require_scope 

258 

259 

260def _parse_checks(parser, value): 

261 try: 

262 return [Check(c.strip()) for c in value.split(",")] 

263 except ValueError as e: 

264 parser.error(str(e)) 

265 

266 

267def _parse_args(): 

268 checks_list = ",".join(sorted(Check)) 

269 parser = ArgumentParser(description="conventional commit checker") 

270 parser.add_argument("rev", nargs="?", default=None) 

271 parser.add_argument("--message-file", type=Path) 

272 parser.add_argument( 

273 "--enable", 

274 metavar="CHECK[,CHECK,...]", 

275 help=f"run only these checks ({checks_list})", 

276 ) 

277 parser.add_argument( 

278 "--disable", 

279 metavar="CHECK[,CHECK,...]", 

280 help=f"skip these checks ({checks_list})", 

281 ) 

282 parser.add_argument( 

283 "--scopes", 

284 metavar="SCOPE[,SCOPE,...]", 

285 help="allowed scope values (any scope accepted if not set)", 

286 ) 

287 parser.add_argument( 

288 "--require-scope", 

289 action="store_true", 

290 default=False, 

291 help="require a scope in the subject line", 

292 ) 

293 args = parser.parse_args() 

294 config = _load_config() 

295 enabled = _resolve_enabled(args, config, parser) 

296 allowed_scopes, require_scope = _resolve_scopes(args, config) 

297 

298 if args.message_file: 

299 rev = None 

300 message = _strip_comments(args.message_file.read_text().strip()) 

301 elif args.rev: 

302 rev = args.rev 

303 message = _strip_comments(_get_message(rev)) 

304 elif not sys.stdin.isatty(): 

305 rev = None 

306 message = _strip_comments(sys.stdin.read().strip()) 

307 else: 

308 rev = "HEAD" 

309 message = _strip_comments(_get_message(rev)) 

310 

311 return Args( 

312 rev=rev, 

313 message=message, 

314 enabled=enabled, 

315 allowed_scopes=allowed_scopes, 

316 require_scope=require_scope, 

317 ) 

318 

319 

320def _report(result): 

321 for level, msg in result.errors: 

322 sys.stderr.write(f" {PREFIXES[level]} {msg}\n") 

323 

324 if result.ok: 

325 sys.stderr.write(" \033[32m✓\033[0m all checks passed\n") 

326 

327 return 0 if result.ok else 1 

328 

329 

330def main(): 

331 args = _parse_args() 

332 lines = args.message.split("\n") 

333 

334 if Check.IMPERATIVE in args.enabled: 

335 _ensure_nltk_data() 

336 

337 result = Result() 

338 

339 desc = None 

340 if Check.SUBJECT in args.enabled: 

341 desc = check_subject( 

342 lines[0], result, args.allowed_scopes, require_scope=args.require_scope 

343 ) 

344 if Check.IMPERATIVE in args.enabled: 

345 if desc is None: 

346 m = SUBJECT_RE.match(lines[0]) 

347 desc = m.group("desc") if m else None 

348 if desc: 

349 check_imperative(desc, result) 

350 if Check.BODY in args.enabled: 

351 check_body(lines, result) 

352 if Check.SIGNED_OFF in args.enabled: 

353 check_signed_off(args.message, result) 

354 if Check.SIGNATURE in args.enabled and args.rev: 

355 check_signature(args.rev, result) 

356 

357 return _report(result)