Coverage for src/su6/core.py: 100%
273 statements
« prev ^ index » next coverage.py v7.8.1, created at 2025-09-19 22:55 +0200
« prev ^ index » next coverage.py v7.8.1, created at 2025-09-19 22:55 +0200
1"""
2This file contains internal helpers used by cli.py.
3"""
5import enum
6import functools
7import inspect
8import json
9import operator
10import pydoc
11import sys
12import types
13import typing
14from dataclasses import dataclass, field
15from pathlib import Path
16from typing import Any, Callable, Optional, TypeAlias, Union
18import configuraptor
19import plumbum.commands.processes as pb
20import tomli
21import typer
22from configuraptor import convert_config
23from configuraptor.helpers import find_pyproject_toml
24from plumbum import local
25from plumbum.machines import LocalCommand
26from rich import print
28if typing.TYPE_CHECKING: # pragma: no cover
29 from .plugins import AnyRegistration
31GREEN_CIRCLE = "🟢"
32YELLOW_CIRCLE = "🟡"
33RED_CIRCLE = "🔴"
35EXIT_CODE_SUCCESS = 0
36EXIT_CODE_ERROR = 1
37EXIT_CODE_COMMAND_NOT_FOUND = 127
40class ExitCodes:
41 """
42 Store the possible EXIT_CODE_ items for ease of use (and autocomplete).
43 """
45 # enum but not really
46 success = EXIT_CODE_SUCCESS
47 error = EXIT_CODE_ERROR
48 command_not_found = EXIT_CODE_COMMAND_NOT_FOUND
51PlumbumError = (pb.ProcessExecutionError, pb.ProcessTimedOut, pb.ProcessLineTimedOut, pb.CommandNotFound)
53# a Command can return these:
54T_Command_Return = bool | int | None
55# ... here indicates any number of args/kwargs:
56# t command is any @app.command() method, which can have anything as input and bool or int as output
57T_Command: TypeAlias = Callable[..., T_Command_Return]
58# t inner wrapper calls t_command and handles its output. This wrapper gets the same (kw)args as above so ... again
59T_Inner_Wrapper: TypeAlias = Callable[..., int | None]
60# outer wrapper gets the t_command method as input and outputs the inner wrapper,
61# so that gets called() with args and kwargs when that method is used from the cli
62T_Outer_Wrapper: TypeAlias = Callable[[T_Command], T_Inner_Wrapper]
65def print_json(data: Any) -> None:
66 """
67 Take a dict of {command: output} or the State and print it.
68 """
69 indent = state.get_config().json_indent or None
70 # none is different from 0 for the indent kwarg, but 0 will be changed to None for this module
71 print(json.dumps(data, default=str, indent=indent))
74def dump_tools_with_results(tools: list[T_Command], results: list[int | bool | None]) -> None:
75 """
76 When using format = json, dump the success of each tool in tools (-> exit code == 0).
78 This method is used in `all` and `fix` (with a list of tools) and in 'with_exit_code' (with one tool).
79 'with_exit_code' does NOT use this method if the return value was a bool, because that's the return value of
80 'all' and 'fix' and those already dump a dict output themselves.
82 Args:
83 tools: list of commands that ran
84 results: list of return values from these commands
85 """
86 print_json({tool.__name__: not result for tool, result in zip(tools, results)})
89def with_exit_code() -> T_Outer_Wrapper:
90 """
91 Convert the return value of an app.command (bool or int) to an typer Exit with return code, \
92 Unless the return value is Falsey, in which case the default exit happens (with exit code 0 indicating success).
94 Usage:
95 > @app.command()
96 > @with_exit_code()
97 def some_command(): ...
99 When calling a command from a different command, _suppress=True can be added to not raise an Exit exception.
100 """
102 def outer_wrapper(func: T_Command) -> T_Inner_Wrapper:
103 @functools.wraps(func)
104 def inner_wrapper(*args: Any, **kwargs: Any) -> int:
105 _suppress = kwargs.pop("_suppress", False)
106 _ignore_exit_codes = kwargs.pop("_ignore", set())
108 result = func(*args, **kwargs)
109 if state.output_format == "json" and not _suppress and result is not None and not isinstance(result, bool):
110 # isinstance(True, int) -> True so not isinstance(result, bool)
111 # print {tool: success}
112 # but only if a retcode is returned,
113 # otherwise (True, False) assume the function handled printing itself.
114 dump_tools_with_results([func], [result])
116 if result is None:
117 # assume no issue then
118 result = 0
120 if (retcode := int(result)) and not _suppress:
121 raise typer.Exit(code=retcode)
123 if retcode in _ignore_exit_codes: # pragma: no cover
124 # there is an error code, but we choose to ignore it -> return 0
125 return ExitCodes.success
127 return retcode
129 return inner_wrapper
131 return outer_wrapper
134def is_available_via_python(tool: str) -> bool:
135 """
136 Sometimes, an executable is not available in PATH (e.g. via pipx) but it is available as `python -m something`.
138 This tries to test for that.
139 May not work for exceptions like 'semantic-release'/'semantic_release' (python-semantic-release)
140 """
141 try:
142 pydoc.render_doc(tool)
143 return True
144 except ImportError:
145 return False
148def is_installed(tool: str, python_fallback: bool = True) -> bool:
149 """
150 Check whether a certain tool is installed (/ can be found via 'which').
151 """
152 try:
153 return bool(local["which"](tool))
154 except pb.ProcessExecutionError:
155 return is_available_via_python(tool) if python_fallback else False
158def on_tool_success(tool_name: str, result: str) -> int:
159 """
160 Last step of run_tool or run_tool_via_python on success.
161 """
162 if state.output_format == "text":
163 print(GREEN_CIRCLE, tool_name)
165 if state.verbosity > 2: # pragma: no cover
166 log_cmd_output(result)
168 return ExitCodes.success # success
171def on_tool_missing(tool_name: str) -> int:
172 """
173 If tool can't be found in both run_tool and run_tool_via_python.
174 """
175 if state.verbosity > 2:
176 warn(f"Tool {tool_name} not installed!")
178 if state.output_format == "text":
179 print(YELLOW_CIRCLE, tool_name)
181 return ExitCodes.command_not_found # command not found
184def on_tool_failure(tool_name: str, e: pb.ProcessExecutionError) -> int:
185 """
186 If tool fails in run_tool or run_tool_via_python.
187 """
188 if state.output_format == "text":
189 print(RED_CIRCLE, tool_name)
191 if state.verbosity > 1:
192 log_cmd_output(e.stdout, e.stderr)
193 return ExitCodes.error # general error
196def run_tool_via_python(tool_name: str, *args: str) -> int:
197 """
198 Fallback: try `python -m tool ...` instead of `tool ...`.
199 """
200 cmd = local[sys.executable]["-m", tool_name]
201 if state.verbosity >= 3:
202 log_command(cmd, args)
204 try:
205 result = cmd(*args)
206 return on_tool_success(tool_name, result)
207 except pb.ProcessExecutionError as e:
208 if "No module named" in e.stderr:
209 return on_tool_missing(tool_name)
211 return on_tool_failure(tool_name, e)
214def run_tool(tool: str, *_args: str) -> int:
215 """
216 Abstraction to run one of the cli checking tools and process its output.
218 Args:
219 tool: the (bash) name of the tool to run.
220 _args: cli args to pass to the cli bash tool
221 """
222 tool_name = tool.split("/")[-1]
224 args = list(_args)
226 if state.config and (extra_flags := state.config.get_default_flags(tool)):
227 args.extend(extra_flags)
229 try:
230 cmd = local[tool]
231 except pb.CommandNotFound: # pragma: no cover
232 return run_tool_via_python(tool_name, *args)
234 if state.verbosity >= 3:
235 log_command(cmd, args)
237 try:
238 result = cmd(*args)
239 return on_tool_success(tool_name, result)
241 except pb.ProcessExecutionError as e:
242 return on_tool_failure(tool_name, e)
245class Verbosity(enum.Enum):
246 """
247 Verbosity is used with the --verbose argument of the cli commands.
248 """
250 # typer enum can only be string
251 quiet = "1"
252 normal = "2"
253 verbose = "3"
254 debug = "4" # only for internal use
256 @staticmethod
257 def _compare(
258 self: "Verbosity",
259 other: "Verbosity_Comparable",
260 _operator: Callable[["Verbosity_Comparable", "Verbosity_Comparable"], bool],
261 ) -> bool:
262 """
263 Abstraction using 'operator' to have shared functionality between <, <=, ==, >=, >.
265 This enum can be compared with integers, strings and other Verbosity instances.
267 Args:
268 self: the first Verbosity
269 other: the second Verbosity (or other thing to compare)
270 _operator: a callable operator (from 'operators') that takes two of the same types as input.
271 """
272 match other:
273 case Verbosity():
274 return _operator(self.value, other.value)
275 case int():
276 return _operator(int(self.value), other)
277 case str():
278 return _operator(int(self.value), int(other))
280 def __gt__(self, other: "Verbosity_Comparable") -> bool:
281 """
282 Magic method for self > other.
283 """
284 return self._compare(self, other, operator.gt)
286 def __ge__(self, other: "Verbosity_Comparable") -> bool:
287 """
288 Method magic for self >= other.
289 """
290 return self._compare(self, other, operator.ge)
292 def __lt__(self, other: "Verbosity_Comparable") -> bool:
293 """
294 Magic method for self < other.
295 """
296 return self._compare(self, other, operator.lt)
298 def __le__(self, other: "Verbosity_Comparable") -> bool:
299 """
300 Magic method for self <= other.
301 """
302 return self._compare(self, other, operator.le)
304 def __eq__(self, other: Union["Verbosity", str, int, object]) -> bool:
305 """
306 Magic method for self == other.
308 'eq' is a special case because 'other' MUST be object according to mypy
309 """
310 if other is Ellipsis or other is inspect._empty:
311 # both instances of object; can't use Ellipsis or type(ELlipsis) = ellipsis as a type hint in mypy
312 # special cases where Typer instanciates its cli arguments,
313 # return False or it will crash
314 return False
316 if other is None:
317 other = DEFAULT_VERBOSITY
319 if not isinstance(other, (str, int, Verbosity)):
320 raise TypeError(f"Object of type {type(other)} can not be compared with Verbosity")
321 return self._compare(self, other, operator.eq)
323 def __hash__(self) -> int:
324 """
325 Magic method for `hash(self)`, also required for Typer to work.
326 """
327 return hash(self.value)
330Verbosity_Comparable = Verbosity | str | int
332DEFAULT_VERBOSITY = Verbosity.normal
335class Format(enum.Enum):
336 """
337 Options for su6 --format.
338 """
340 text = "text"
341 json = "json"
343 def __eq__(self, other: object) -> bool:
344 """
345 Magic method for self == other.
347 'eq' is a special case because 'other' MUST be object according to mypy
348 """
349 if other is Ellipsis or other is inspect._empty:
350 # both instances of object; can't use Ellipsis or type(ELlipsis) = ellipsis as a type hint in mypy
351 # special cases where Typer instanciates its cli arguments,
352 # return False or it will crash
353 return False
354 return self.value == other
356 def __hash__(self) -> int:
357 """
358 Magic method for `hash(self)`, also required for Typer to work.
359 """
360 return hash(self.value)
363DEFAULT_FORMAT = Format.text
365C = typing.TypeVar("C", bound=T_Command)
367DEFAULT_BADGE = "coverage.svg"
370class AbstractConfig(configuraptor.TypedConfig, configuraptor.Singleton):
371 """
372 Used by state.config and plugin configs.
373 """
375 _strict = True
378@dataclass
379class Config(AbstractConfig):
380 """
381 Used as typed version of the [tool.su6] part of pyproject.toml.
383 Also accessible via state.config
384 """
386 directory: str = "."
387 pyproject: str = "pyproject.toml"
388 include: list[str] = field(default_factory=list)
389 exclude: list[str] = field(default_factory=list)
390 stop_after_first_failure: bool = False
391 json_indent: int = 4
392 docstyle_convention: Optional[str] = None
393 default_flags: typing.Optional[dict[str, str | list[str]]] = field(default=None)
395 ### pytest ###
396 coverage: Optional[float] = None # only relevant for pytest
397 badge: bool | str = False # only relevant for pytest
399 def __post_init__(self) -> None:
400 """
401 Update the value of badge to the default path.
402 """
403 self.__raw: dict[str, Any] = {}
404 if self.badge is True: # pragma: no cover
405 # no cover because pytest can't test pytest :C
406 self.badge = DEFAULT_BADGE
408 def determine_which_to_run(self, options: list[C], exclude: list[str] = None) -> list[C]:
409 """
410 Filter out any includes/excludes from pyproject.toml (first check include, then exclude).
412 `exclude` via cli overwrites config option.
413 """
414 if self.include:
415 tools = [_ for _ in options if _.__name__ in self.include and _.__name__ not in (exclude or [])]
416 tools.sort(key=lambda f: self.include.index(f.__name__))
417 return tools
418 elif self.exclude or exclude:
419 to_exclude = set((self.exclude or []) + (exclude or []))
420 return [_ for _ in options if _.__name__ not in to_exclude]
421 else:
422 return options
424 def determine_plugins_to_run(self, attr: str, exclude: list[str] = None) -> list[T_Command]:
425 """
426 Similar to `determine_which_to_run` but for plugin commands, and without 'include' ('exclude' only).
428 Attr is the key in Registration to filter plugins on, e.g. 'add_to_all'
429 """
430 to_exclude = set((self.exclude or []) + (exclude or []))
432 return [
433 _.wrapped for name, _ in state._registered_plugins.items() if getattr(_, attr) and name not in to_exclude
434 ]
436 def set_raw(self, raw: dict[str, Any]) -> None:
437 """
438 Set the raw config dict (from pyproject.toml).
440 Used to later look up Plugin config.
441 """
442 self.__raw.update(raw)
444 def get_raw(self) -> dict[str, Any]:
445 """
446 Get the raw config dict (to load Plugin config).
447 """
448 return self.__raw or {}
450 def get_default_flags(self, service: str) -> list[str]:
451 """
452 For a given service, load the additional flags from pyproject.toml.
454 Example:
455 [tool.su6.default-flags]
456 mypy = "--disable-error-code misc"
457 black = ["--include", "something", "--exclude", "something"]
458 """
459 if not self.default_flags:
460 return []
462 flags = self.default_flags.get(service, [])
463 if not flags:
464 return []
466 if isinstance(flags, list):
467 return flags
468 elif isinstance(flags, str):
469 return [_.strip() for _ in flags.split(" ") if _.strip()]
470 raise TypeError(f"Invalid type {type(flags)} for flags.")
473MaybeConfig: TypeAlias = Optional[Config]
475T_typelike: TypeAlias = type | types.UnionType | types.UnionType
478def _get_su6_config(overwrites: dict[str, Any], toml_path: Optional[str | Path] = None) -> MaybeConfig:
479 """
480 Parse the users pyproject.toml (found using black's logic) and extract the tool.su6 part.
482 The types as entered in the toml are checked using _ensure_types,
483 to make sure there isn't a string implicitly converted to a list of characters or something.
485 Args:
486 overwrites: cli arguments can overwrite the config toml.
487 toml_path: by default, black will search for a relevant pyproject.toml.
488 If a toml_path is provided, that file will be used instead.
489 """
490 if toml_path is None:
491 toml_path = find_pyproject_toml()
493 if not toml_path:
494 return None
496 with open(toml_path, "rb") as f:
497 full_config = tomli.load(f)
499 tool_config = full_config["tool"]
501 config = configuraptor.load_into(Config, tool_config, key="su6")
503 config.update(pyproject=str(toml_path))
504 config.update(**overwrites)
505 # for plugins:
506 config.set_raw(tool_config["su6"])
508 return config
511def get_su6_config(verbosity: Verbosity = DEFAULT_VERBOSITY, toml_path: str = None, **overwrites: Any) -> Config:
512 """
513 Load the relevant pyproject.toml config settings.
515 Args:
516 verbosity: if something goes wrong, level 3+ will show a warning and 4+ will raise the exception.
517 toml_path: --config can be used to use a different file than ./pyproject.toml
518 overwrites (dict[str, Any): cli arguments can overwrite the config toml.
519 If a value is None, the key is not overwritten.
520 """
521 # strip out any 'overwrites' with None as value
522 overwrites = convert_config(overwrites)
524 try:
525 if config := _get_su6_config(overwrites, toml_path=toml_path):
526 return config
527 raise ValueError("Falsey config?")
528 except Exception as e:
529 # something went wrong parsing config, use defaults
530 if verbosity > 3:
531 # verbosity = debug
532 raise e
533 elif verbosity > 2:
534 # verbosity = verbose
535 print("Error parsing pyproject.toml, falling back to defaults.", file=sys.stderr)
536 return Config(**overwrites)
539def info(*args: str) -> None:
540 """
541 'print' but with blue text.
542 """
543 print(f"[blue]{' '.join(args)}[/blue]", file=sys.stderr)
546def warn(*args: str) -> None:
547 """
548 'print' but with yellow text.
549 """
550 print(f"[yellow]{' '.join(args)}[/yellow]", file=sys.stderr)
553def danger(*args: str) -> None:
554 """
555 'print' but with red text.
556 """
557 print(f"[red]{' '.join(args)}[/red]", file=sys.stderr)
560def log_command(command: LocalCommand, args: typing.Iterable[str]) -> None:
561 """
562 Print a Plumbum command in blue, prefixed with > to indicate it's a shell command.
563 """
564 info(f"> {command[args]}")
567def log_cmd_output(stdout: str = "", stderr: str = "") -> None:
568 """
569 Print stdout in yellow and stderr in red.
570 """
571 # if you are logging stdout, it's probably because it's not a successful run.
572 # However, it's not stderr so we make it warning-yellow
573 warn(stdout)
574 # probably more important error stuff, so stderr goes last:
575 danger(stderr)
578# postponed: use with Unpack later.
579# class _Overwrites(typing.TypedDict, total=False):
580# config_file: Optional[str]
581# verbosity: Verbosity
582# output_format: Format
583# # + kwargs
586@dataclass()
587class ApplicationState:
588 """
589 Application State - global user defined variables.
591 State contains generic variables passed BEFORE the subcommand (so --verbosity, --config, ...),
592 whereas Config contains settings from the config toml file, updated with arguments AFTER the subcommand
593 (e.g. su6 subcommand <directory> --flag), directory and flag will be updated in the config and not the state.
595 To summarize: 'state' is applicable to all commands and config only to specific ones.
596 """
598 verbosity: Verbosity = DEFAULT_VERBOSITY
599 output_format: Format = DEFAULT_FORMAT
600 config_file: Optional[str] = None # will be filled with black's search logic
601 config: MaybeConfig = None
603 def __post_init__(self) -> None:
604 """
605 Store registered plugin config.
606 """
607 self._plugin_configs: dict[str, AbstractConfig] = {}
608 self._registered_plugins: dict[str, "AnyRegistration"] = {}
610 def register_plugin(self, plugin_name: str, registration: "AnyRegistration") -> None:
611 """
612 Connect a Registration to the State.
614 Used by `all` and `fix` to include plugin commands with add_to_all or add_to_fix respectively.
615 """
616 plugin_name = plugin_name.replace("_", "-")
617 self._registered_plugins[plugin_name] = registration
619 def load_config(self, **overwrites: Any) -> Config:
620 """
621 Load the su6 config from pyproject.toml (or other config_file) with optional overwriting settings.
623 Also updates attached plugin configs.
624 """
625 if overwrites.get("verbosity") is not None:
626 self.verbosity = overwrites["verbosity"]
627 if overwrites.get("config_file") is not None:
628 self.config_file = overwrites.pop("config_file")
629 if overwrites.get("output_format") is not None:
630 self.output_format = overwrites.pop("output_format")
632 self.config = get_su6_config(toml_path=self.config_file, **overwrites)
633 self._setup_plugin_config_defaults()
634 return self.config
636 def attach_plugin_config(self, name: str, config_cls: AbstractConfig) -> None:
637 """
638 Add a new plugin-specific config to be loaded later with load_config().
640 Called from plugins.py when an @registered PluginConfig is found.
641 """
642 self._plugin_configs[name] = config_cls
644 def _setup_plugin_config_defaults(self) -> None:
645 """
646 After load_config, the raw data is used to also fill registered plugin configs.
647 """
648 config = self.get_config()
649 raw = config.get_raw()
650 for name, config_instance in self._plugin_configs.items():
651 configuraptor.load_into_instance(config_instance, raw, key=name, strict=config_instance._strict)
653 def get_config(self) -> Config:
654 """
655 Get a filled config instance.
656 """
657 return self.config or self.load_config()
659 def update_config(self, **values: Any) -> Config:
660 """
661 Overwrite default/toml settings with cli values.
663 Example:
664 `config = state.update_config(directory='src')`
665 This will update the state's config and return the same object with the updated settings.
666 """
667 existing_config = self.get_config()
669 values = convert_config(values)
670 existing_config.update(**values)
671 return existing_config
674state = ApplicationState()