Coverage for src/su6/core.py: 100%

273 statements  

« prev     ^ index     » next       coverage.py v7.8.1, created at 2025-09-19 22:55 +0200

1""" 

2This file contains internal helpers used by cli.py. 

3""" 

4 

5import enum 

6import functools 

7import inspect 

8import json 

9import operator 

10import pydoc 

11import sys 

12import types 

13import typing 

14from dataclasses import dataclass, field 

15from pathlib import Path 

16from typing import Any, Callable, Optional, TypeAlias, Union 

17 

18import configuraptor 

19import plumbum.commands.processes as pb 

20import tomli 

21import typer 

22from configuraptor import convert_config 

23from configuraptor.helpers import find_pyproject_toml 

24from plumbum import local 

25from plumbum.machines import LocalCommand 

26from rich import print 

27 

28if typing.TYPE_CHECKING: # pragma: no cover 

29 from .plugins import AnyRegistration 

30 

31GREEN_CIRCLE = "🟢" 

32YELLOW_CIRCLE = "🟡" 

33RED_CIRCLE = "🔴" 

34 

35EXIT_CODE_SUCCESS = 0 

36EXIT_CODE_ERROR = 1 

37EXIT_CODE_COMMAND_NOT_FOUND = 127 

38 

39 

40class ExitCodes: 

41 """ 

42 Store the possible EXIT_CODE_ items for ease of use (and autocomplete). 

43 """ 

44 

45 # enum but not really 

46 success = EXIT_CODE_SUCCESS 

47 error = EXIT_CODE_ERROR 

48 command_not_found = EXIT_CODE_COMMAND_NOT_FOUND 

49 

50 

51PlumbumError = (pb.ProcessExecutionError, pb.ProcessTimedOut, pb.ProcessLineTimedOut, pb.CommandNotFound) 

52 

53# a Command can return these: 

54T_Command_Return = bool | int | None 

55# ... here indicates any number of args/kwargs: 

56# t command is any @app.command() method, which can have anything as input and bool or int as output 

57T_Command: TypeAlias = Callable[..., T_Command_Return] 

58# t inner wrapper calls t_command and handles its output. This wrapper gets the same (kw)args as above so ... again 

59T_Inner_Wrapper: TypeAlias = Callable[..., int | None] 

60# outer wrapper gets the t_command method as input and outputs the inner wrapper, 

61# so that gets called() with args and kwargs when that method is used from the cli 

62T_Outer_Wrapper: TypeAlias = Callable[[T_Command], T_Inner_Wrapper] 

63 

64 

65def print_json(data: Any) -> None: 

66 """ 

67 Take a dict of {command: output} or the State and print it. 

68 """ 

69 indent = state.get_config().json_indent or None 

70 # none is different from 0 for the indent kwarg, but 0 will be changed to None for this module 

71 print(json.dumps(data, default=str, indent=indent)) 

72 

73 

74def dump_tools_with_results(tools: list[T_Command], results: list[int | bool | None]) -> None: 

75 """ 

76 When using format = json, dump the success of each tool in tools (-> exit code == 0). 

77 

78 This method is used in `all` and `fix` (with a list of tools) and in 'with_exit_code' (with one tool). 

79 'with_exit_code' does NOT use this method if the return value was a bool, because that's the return value of 

80 'all' and 'fix' and those already dump a dict output themselves. 

81 

82 Args: 

83 tools: list of commands that ran 

84 results: list of return values from these commands 

85 """ 

86 print_json({tool.__name__: not result for tool, result in zip(tools, results)}) 

87 

88 

89def with_exit_code() -> T_Outer_Wrapper: 

90 """ 

91 Convert the return value of an app.command (bool or int) to an typer Exit with return code, \ 

92 Unless the return value is Falsey, in which case the default exit happens (with exit code 0 indicating success). 

93 

94 Usage: 

95 > @app.command() 

96 > @with_exit_code() 

97 def some_command(): ... 

98 

99 When calling a command from a different command, _suppress=True can be added to not raise an Exit exception. 

100 """ 

101 

102 def outer_wrapper(func: T_Command) -> T_Inner_Wrapper: 

103 @functools.wraps(func) 

104 def inner_wrapper(*args: Any, **kwargs: Any) -> int: 

105 _suppress = kwargs.pop("_suppress", False) 

106 _ignore_exit_codes = kwargs.pop("_ignore", set()) 

107 

108 result = func(*args, **kwargs) 

109 if state.output_format == "json" and not _suppress and result is not None and not isinstance(result, bool): 

110 # isinstance(True, int) -> True so not isinstance(result, bool) 

111 # print {tool: success} 

112 # but only if a retcode is returned, 

113 # otherwise (True, False) assume the function handled printing itself. 

114 dump_tools_with_results([func], [result]) 

115 

116 if result is None: 

117 # assume no issue then 

118 result = 0 

119 

120 if (retcode := int(result)) and not _suppress: 

121 raise typer.Exit(code=retcode) 

122 

123 if retcode in _ignore_exit_codes: # pragma: no cover 

124 # there is an error code, but we choose to ignore it -> return 0 

125 return ExitCodes.success 

126 

127 return retcode 

128 

129 return inner_wrapper 

130 

131 return outer_wrapper 

132 

133 

134def is_available_via_python(tool: str) -> bool: 

135 """ 

136 Sometimes, an executable is not available in PATH (e.g. via pipx) but it is available as `python -m something`. 

137 

138 This tries to test for that. 

139 May not work for exceptions like 'semantic-release'/'semantic_release' (python-semantic-release) 

140 """ 

141 try: 

142 pydoc.render_doc(tool) 

143 return True 

144 except ImportError: 

145 return False 

146 

147 

148def is_installed(tool: str, python_fallback: bool = True) -> bool: 

149 """ 

150 Check whether a certain tool is installed (/ can be found via 'which'). 

151 """ 

152 try: 

153 return bool(local["which"](tool)) 

154 except pb.ProcessExecutionError: 

155 return is_available_via_python(tool) if python_fallback else False 

156 

157 

158def on_tool_success(tool_name: str, result: str) -> int: 

159 """ 

160 Last step of run_tool or run_tool_via_python on success. 

161 """ 

162 if state.output_format == "text": 

163 print(GREEN_CIRCLE, tool_name) 

164 

165 if state.verbosity > 2: # pragma: no cover 

166 log_cmd_output(result) 

167 

168 return ExitCodes.success # success 

169 

170 

171def on_tool_missing(tool_name: str) -> int: 

172 """ 

173 If tool can't be found in both run_tool and run_tool_via_python. 

174 """ 

175 if state.verbosity > 2: 

176 warn(f"Tool {tool_name} not installed!") 

177 

178 if state.output_format == "text": 

179 print(YELLOW_CIRCLE, tool_name) 

180 

181 return ExitCodes.command_not_found # command not found 

182 

183 

184def on_tool_failure(tool_name: str, e: pb.ProcessExecutionError) -> int: 

185 """ 

186 If tool fails in run_tool or run_tool_via_python. 

187 """ 

188 if state.output_format == "text": 

189 print(RED_CIRCLE, tool_name) 

190 

191 if state.verbosity > 1: 

192 log_cmd_output(e.stdout, e.stderr) 

193 return ExitCodes.error # general error 

194 

195 

196def run_tool_via_python(tool_name: str, *args: str) -> int: 

197 """ 

198 Fallback: try `python -m tool ...` instead of `tool ...`. 

199 """ 

200 cmd = local[sys.executable]["-m", tool_name] 

201 if state.verbosity >= 3: 

202 log_command(cmd, args) 

203 

204 try: 

205 result = cmd(*args) 

206 return on_tool_success(tool_name, result) 

207 except pb.ProcessExecutionError as e: 

208 if "No module named" in e.stderr: 

209 return on_tool_missing(tool_name) 

210 

211 return on_tool_failure(tool_name, e) 

212 

213 

214def run_tool(tool: str, *_args: str) -> int: 

215 """ 

216 Abstraction to run one of the cli checking tools and process its output. 

217 

218 Args: 

219 tool: the (bash) name of the tool to run. 

220 _args: cli args to pass to the cli bash tool 

221 """ 

222 tool_name = tool.split("/")[-1] 

223 

224 args = list(_args) 

225 

226 if state.config and (extra_flags := state.config.get_default_flags(tool)): 

227 args.extend(extra_flags) 

228 

229 try: 

230 cmd = local[tool] 

231 except pb.CommandNotFound: # pragma: no cover 

232 return run_tool_via_python(tool_name, *args) 

233 

234 if state.verbosity >= 3: 

235 log_command(cmd, args) 

236 

237 try: 

238 result = cmd(*args) 

239 return on_tool_success(tool_name, result) 

240 

241 except pb.ProcessExecutionError as e: 

242 return on_tool_failure(tool_name, e) 

243 

244 

245class Verbosity(enum.Enum): 

246 """ 

247 Verbosity is used with the --verbose argument of the cli commands. 

248 """ 

249 

250 # typer enum can only be string 

251 quiet = "1" 

252 normal = "2" 

253 verbose = "3" 

254 debug = "4" # only for internal use 

255 

256 @staticmethod 

257 def _compare( 

258 self: "Verbosity", 

259 other: "Verbosity_Comparable", 

260 _operator: Callable[["Verbosity_Comparable", "Verbosity_Comparable"], bool], 

261 ) -> bool: 

262 """ 

263 Abstraction using 'operator' to have shared functionality between <, <=, ==, >=, >. 

264 

265 This enum can be compared with integers, strings and other Verbosity instances. 

266 

267 Args: 

268 self: the first Verbosity 

269 other: the second Verbosity (or other thing to compare) 

270 _operator: a callable operator (from 'operators') that takes two of the same types as input. 

271 """ 

272 match other: 

273 case Verbosity(): 

274 return _operator(self.value, other.value) 

275 case int(): 

276 return _operator(int(self.value), other) 

277 case str(): 

278 return _operator(int(self.value), int(other)) 

279 

280 def __gt__(self, other: "Verbosity_Comparable") -> bool: 

281 """ 

282 Magic method for self > other. 

283 """ 

284 return self._compare(self, other, operator.gt) 

285 

286 def __ge__(self, other: "Verbosity_Comparable") -> bool: 

287 """ 

288 Method magic for self >= other. 

289 """ 

290 return self._compare(self, other, operator.ge) 

291 

292 def __lt__(self, other: "Verbosity_Comparable") -> bool: 

293 """ 

294 Magic method for self < other. 

295 """ 

296 return self._compare(self, other, operator.lt) 

297 

298 def __le__(self, other: "Verbosity_Comparable") -> bool: 

299 """ 

300 Magic method for self <= other. 

301 """ 

302 return self._compare(self, other, operator.le) 

303 

304 def __eq__(self, other: Union["Verbosity", str, int, object]) -> bool: 

305 """ 

306 Magic method for self == other. 

307 

308 'eq' is a special case because 'other' MUST be object according to mypy 

309 """ 

310 if other is Ellipsis or other is inspect._empty: 

311 # both instances of object; can't use Ellipsis or type(ELlipsis) = ellipsis as a type hint in mypy 

312 # special cases where Typer instanciates its cli arguments, 

313 # return False or it will crash 

314 return False 

315 

316 if other is None: 

317 other = DEFAULT_VERBOSITY 

318 

319 if not isinstance(other, (str, int, Verbosity)): 

320 raise TypeError(f"Object of type {type(other)} can not be compared with Verbosity") 

321 return self._compare(self, other, operator.eq) 

322 

323 def __hash__(self) -> int: 

324 """ 

325 Magic method for `hash(self)`, also required for Typer to work. 

326 """ 

327 return hash(self.value) 

328 

329 

330Verbosity_Comparable = Verbosity | str | int 

331 

332DEFAULT_VERBOSITY = Verbosity.normal 

333 

334 

335class Format(enum.Enum): 

336 """ 

337 Options for su6 --format. 

338 """ 

339 

340 text = "text" 

341 json = "json" 

342 

343 def __eq__(self, other: object) -> bool: 

344 """ 

345 Magic method for self == other. 

346 

347 'eq' is a special case because 'other' MUST be object according to mypy 

348 """ 

349 if other is Ellipsis or other is inspect._empty: 

350 # both instances of object; can't use Ellipsis or type(ELlipsis) = ellipsis as a type hint in mypy 

351 # special cases where Typer instanciates its cli arguments, 

352 # return False or it will crash 

353 return False 

354 return self.value == other 

355 

356 def __hash__(self) -> int: 

357 """ 

358 Magic method for `hash(self)`, also required for Typer to work. 

359 """ 

360 return hash(self.value) 

361 

362 

363DEFAULT_FORMAT = Format.text 

364 

365C = typing.TypeVar("C", bound=T_Command) 

366 

367DEFAULT_BADGE = "coverage.svg" 

368 

369 

370class AbstractConfig(configuraptor.TypedConfig, configuraptor.Singleton): 

371 """ 

372 Used by state.config and plugin configs. 

373 """ 

374 

375 _strict = True 

376 

377 

378@dataclass 

379class Config(AbstractConfig): 

380 """ 

381 Used as typed version of the [tool.su6] part of pyproject.toml. 

382 

383 Also accessible via state.config 

384 """ 

385 

386 directory: str = "." 

387 pyproject: str = "pyproject.toml" 

388 include: list[str] = field(default_factory=list) 

389 exclude: list[str] = field(default_factory=list) 

390 stop_after_first_failure: bool = False 

391 json_indent: int = 4 

392 docstyle_convention: Optional[str] = None 

393 default_flags: typing.Optional[dict[str, str | list[str]]] = field(default=None) 

394 

395 ### pytest ### 

396 coverage: Optional[float] = None # only relevant for pytest 

397 badge: bool | str = False # only relevant for pytest 

398 

399 def __post_init__(self) -> None: 

400 """ 

401 Update the value of badge to the default path. 

402 """ 

403 self.__raw: dict[str, Any] = {} 

404 if self.badge is True: # pragma: no cover 

405 # no cover because pytest can't test pytest :C 

406 self.badge = DEFAULT_BADGE 

407 

408 def determine_which_to_run(self, options: list[C], exclude: list[str] = None) -> list[C]: 

409 """ 

410 Filter out any includes/excludes from pyproject.toml (first check include, then exclude). 

411 

412 `exclude` via cli overwrites config option. 

413 """ 

414 if self.include: 

415 tools = [_ for _ in options if _.__name__ in self.include and _.__name__ not in (exclude or [])] 

416 tools.sort(key=lambda f: self.include.index(f.__name__)) 

417 return tools 

418 elif self.exclude or exclude: 

419 to_exclude = set((self.exclude or []) + (exclude or [])) 

420 return [_ for _ in options if _.__name__ not in to_exclude] 

421 else: 

422 return options 

423 

424 def determine_plugins_to_run(self, attr: str, exclude: list[str] = None) -> list[T_Command]: 

425 """ 

426 Similar to `determine_which_to_run` but for plugin commands, and without 'include' ('exclude' only). 

427 

428 Attr is the key in Registration to filter plugins on, e.g. 'add_to_all' 

429 """ 

430 to_exclude = set((self.exclude or []) + (exclude or [])) 

431 

432 return [ 

433 _.wrapped for name, _ in state._registered_plugins.items() if getattr(_, attr) and name not in to_exclude 

434 ] 

435 

436 def set_raw(self, raw: dict[str, Any]) -> None: 

437 """ 

438 Set the raw config dict (from pyproject.toml). 

439 

440 Used to later look up Plugin config. 

441 """ 

442 self.__raw.update(raw) 

443 

444 def get_raw(self) -> dict[str, Any]: 

445 """ 

446 Get the raw config dict (to load Plugin config). 

447 """ 

448 return self.__raw or {} 

449 

450 def get_default_flags(self, service: str) -> list[str]: 

451 """ 

452 For a given service, load the additional flags from pyproject.toml. 

453 

454 Example: 

455 [tool.su6.default-flags] 

456 mypy = "--disable-error-code misc" 

457 black = ["--include", "something", "--exclude", "something"] 

458 """ 

459 if not self.default_flags: 

460 return [] 

461 

462 flags = self.default_flags.get(service, []) 

463 if not flags: 

464 return [] 

465 

466 if isinstance(flags, list): 

467 return flags 

468 elif isinstance(flags, str): 

469 return [_.strip() for _ in flags.split(" ") if _.strip()] 

470 raise TypeError(f"Invalid type {type(flags)} for flags.") 

471 

472 

473MaybeConfig: TypeAlias = Optional[Config] 

474 

475T_typelike: TypeAlias = type | types.UnionType | types.UnionType 

476 

477 

478def _get_su6_config(overwrites: dict[str, Any], toml_path: Optional[str | Path] = None) -> MaybeConfig: 

479 """ 

480 Parse the users pyproject.toml (found using black's logic) and extract the tool.su6 part. 

481 

482 The types as entered in the toml are checked using _ensure_types, 

483 to make sure there isn't a string implicitly converted to a list of characters or something. 

484 

485 Args: 

486 overwrites: cli arguments can overwrite the config toml. 

487 toml_path: by default, black will search for a relevant pyproject.toml. 

488 If a toml_path is provided, that file will be used instead. 

489 """ 

490 if toml_path is None: 

491 toml_path = find_pyproject_toml() 

492 

493 if not toml_path: 

494 return None 

495 

496 with open(toml_path, "rb") as f: 

497 full_config = tomli.load(f) 

498 

499 tool_config = full_config["tool"] 

500 

501 config = configuraptor.load_into(Config, tool_config, key="su6") 

502 

503 config.update(pyproject=str(toml_path)) 

504 config.update(**overwrites) 

505 # for plugins: 

506 config.set_raw(tool_config["su6"]) 

507 

508 return config 

509 

510 

511def get_su6_config(verbosity: Verbosity = DEFAULT_VERBOSITY, toml_path: str = None, **overwrites: Any) -> Config: 

512 """ 

513 Load the relevant pyproject.toml config settings. 

514 

515 Args: 

516 verbosity: if something goes wrong, level 3+ will show a warning and 4+ will raise the exception. 

517 toml_path: --config can be used to use a different file than ./pyproject.toml 

518 overwrites (dict[str, Any): cli arguments can overwrite the config toml. 

519 If a value is None, the key is not overwritten. 

520 """ 

521 # strip out any 'overwrites' with None as value 

522 overwrites = convert_config(overwrites) 

523 

524 try: 

525 if config := _get_su6_config(overwrites, toml_path=toml_path): 

526 return config 

527 raise ValueError("Falsey config?") 

528 except Exception as e: 

529 # something went wrong parsing config, use defaults 

530 if verbosity > 3: 

531 # verbosity = debug 

532 raise e 

533 elif verbosity > 2: 

534 # verbosity = verbose 

535 print("Error parsing pyproject.toml, falling back to defaults.", file=sys.stderr) 

536 return Config(**overwrites) 

537 

538 

539def info(*args: str) -> None: 

540 """ 

541 'print' but with blue text. 

542 """ 

543 print(f"[blue]{' '.join(args)}[/blue]", file=sys.stderr) 

544 

545 

546def warn(*args: str) -> None: 

547 """ 

548 'print' but with yellow text. 

549 """ 

550 print(f"[yellow]{' '.join(args)}[/yellow]", file=sys.stderr) 

551 

552 

553def danger(*args: str) -> None: 

554 """ 

555 'print' but with red text. 

556 """ 

557 print(f"[red]{' '.join(args)}[/red]", file=sys.stderr) 

558 

559 

560def log_command(command: LocalCommand, args: typing.Iterable[str]) -> None: 

561 """ 

562 Print a Plumbum command in blue, prefixed with > to indicate it's a shell command. 

563 """ 

564 info(f"> {command[args]}") 

565 

566 

567def log_cmd_output(stdout: str = "", stderr: str = "") -> None: 

568 """ 

569 Print stdout in yellow and stderr in red. 

570 """ 

571 # if you are logging stdout, it's probably because it's not a successful run. 

572 # However, it's not stderr so we make it warning-yellow 

573 warn(stdout) 

574 # probably more important error stuff, so stderr goes last: 

575 danger(stderr) 

576 

577 

578# postponed: use with Unpack later. 

579# class _Overwrites(typing.TypedDict, total=False): 

580# config_file: Optional[str] 

581# verbosity: Verbosity 

582# output_format: Format 

583# # + kwargs 

584 

585 

586@dataclass() 

587class ApplicationState: 

588 """ 

589 Application State - global user defined variables. 

590 

591 State contains generic variables passed BEFORE the subcommand (so --verbosity, --config, ...), 

592 whereas Config contains settings from the config toml file, updated with arguments AFTER the subcommand 

593 (e.g. su6 subcommand <directory> --flag), directory and flag will be updated in the config and not the state. 

594 

595 To summarize: 'state' is applicable to all commands and config only to specific ones. 

596 """ 

597 

598 verbosity: Verbosity = DEFAULT_VERBOSITY 

599 output_format: Format = DEFAULT_FORMAT 

600 config_file: Optional[str] = None # will be filled with black's search logic 

601 config: MaybeConfig = None 

602 

603 def __post_init__(self) -> None: 

604 """ 

605 Store registered plugin config. 

606 """ 

607 self._plugin_configs: dict[str, AbstractConfig] = {} 

608 self._registered_plugins: dict[str, "AnyRegistration"] = {} 

609 

610 def register_plugin(self, plugin_name: str, registration: "AnyRegistration") -> None: 

611 """ 

612 Connect a Registration to the State. 

613 

614 Used by `all` and `fix` to include plugin commands with add_to_all or add_to_fix respectively. 

615 """ 

616 plugin_name = plugin_name.replace("_", "-") 

617 self._registered_plugins[plugin_name] = registration 

618 

619 def load_config(self, **overwrites: Any) -> Config: 

620 """ 

621 Load the su6 config from pyproject.toml (or other config_file) with optional overwriting settings. 

622 

623 Also updates attached plugin configs. 

624 """ 

625 if overwrites.get("verbosity") is not None: 

626 self.verbosity = overwrites["verbosity"] 

627 if overwrites.get("config_file") is not None: 

628 self.config_file = overwrites.pop("config_file") 

629 if overwrites.get("output_format") is not None: 

630 self.output_format = overwrites.pop("output_format") 

631 

632 self.config = get_su6_config(toml_path=self.config_file, **overwrites) 

633 self._setup_plugin_config_defaults() 

634 return self.config 

635 

636 def attach_plugin_config(self, name: str, config_cls: AbstractConfig) -> None: 

637 """ 

638 Add a new plugin-specific config to be loaded later with load_config(). 

639 

640 Called from plugins.py when an @registered PluginConfig is found. 

641 """ 

642 self._plugin_configs[name] = config_cls 

643 

644 def _setup_plugin_config_defaults(self) -> None: 

645 """ 

646 After load_config, the raw data is used to also fill registered plugin configs. 

647 """ 

648 config = self.get_config() 

649 raw = config.get_raw() 

650 for name, config_instance in self._plugin_configs.items(): 

651 configuraptor.load_into_instance(config_instance, raw, key=name, strict=config_instance._strict) 

652 

653 def get_config(self) -> Config: 

654 """ 

655 Get a filled config instance. 

656 """ 

657 return self.config or self.load_config() 

658 

659 def update_config(self, **values: Any) -> Config: 

660 """ 

661 Overwrite default/toml settings with cli values. 

662 

663 Example: 

664 `config = state.update_config(directory='src')` 

665 This will update the state's config and return the same object with the updated settings. 

666 """ 

667 existing_config = self.get_config() 

668 

669 values = convert_config(values) 

670 existing_config.update(**values) 

671 return existing_config 

672 

673 

674state = ApplicationState()