Coverage for src / dotbot / plugins / clean.py: 92%
52 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-29 10:55 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-29 10:55 -0800
1import os
2import sys
3from typing import Any
5from dotbot.plugin import Plugin
6from dotbot.util.common import normslash
9class Clean(Plugin):
10 """
11 Cleans broken symbolic links.
12 """
14 supports_dry_run = True
16 _directive = "clean"
18 def can_handle(self, directive: str) -> bool:
19 return directive == self._directive
21 def handle(self, directive: str, data: Any) -> bool:
22 if directive != self._directive:
23 msg = f"Clean cannot handle directive {directive}"
24 raise ValueError(msg)
25 return self._process_clean(data)
27 def _process_clean(self, targets: Any) -> bool:
28 success = True
29 defaults = self._context.defaults().get(self._directive, {})
30 for target in targets:
31 force = defaults.get("force", False)
32 recursive = defaults.get("recursive", False)
33 if isinstance(targets, dict) and isinstance(targets[target], dict):
34 force = targets[target].get("force", force)
35 recursive = targets[target].get("recursive", recursive)
36 success &= self._clean(normslash(target), force=force, recursive=recursive)
37 if success:
38 self._log.info("All targets have been cleaned")
39 else:
40 self._log.error("Some targets were not successfully cleaned")
41 return success
43 def _clean(self, target: str, *, force: bool, recursive: bool) -> bool:
44 """
45 Cleans all the broken symbolic links in target if they point to
46 a subdirectory of the base directory or if forced to clean.
47 """
48 if not os.path.isdir(os.path.expandvars(os.path.expanduser(target))):
49 self._log.debug(f"Ignoring nonexistent directory {target}")
50 return True
51 for item in os.listdir(os.path.expandvars(os.path.expanduser(target))):
52 path = os.path.abspath(os.path.join(os.path.expandvars(os.path.expanduser(target)), item))
53 if recursive and os.path.isdir(path):
54 # isdir implies not islink -- we don't want to descend into
55 # symlinked directories. okay to do a recursive call here
56 # because depth should be fairly limited
57 self._clean(path, force=force, recursive=recursive)
58 if not os.path.exists(path) and os.path.islink(path):
59 points_at = os.path.join(os.path.dirname(path), os.readlink(path))
60 if sys.platform == "win32" and points_at.startswith("\\\\?\\"):
61 points_at = points_at[4:]
62 if self._in_directory(path, self._context.base_directory()) or force:
63 if self._context.dry_run():
64 self._log.action(f"Would remove invalid link {path} -> {points_at}")
65 else:
66 self._log.action(f"Removing invalid link {path} -> {points_at}")
67 os.remove(path)
68 else:
69 self._log.info(f"Link {path} -> {points_at} not removed.")
70 return True
72 def _in_directory(self, path: str, directory: str) -> bool:
73 """
74 Returns true if the path is in the directory.
75 """
76 directory = os.path.join(os.path.realpath(directory), "")
77 path = os.path.realpath(path)
78 return os.path.commonprefix([path, directory]) == directory