Coverage for src / dotbot / plugins / clean.py: 92%

52 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-29 10:55 -0800

1import os 

2import sys 

3from typing import Any 

4 

5from dotbot.plugin import Plugin 

6from dotbot.util.common import normslash 

7 

8 

9class Clean(Plugin): 

10 """ 

11 Cleans broken symbolic links. 

12 """ 

13 

14 supports_dry_run = True 

15 

16 _directive = "clean" 

17 

18 def can_handle(self, directive: str) -> bool: 

19 return directive == self._directive 

20 

21 def handle(self, directive: str, data: Any) -> bool: 

22 if directive != self._directive: 

23 msg = f"Clean cannot handle directive {directive}" 

24 raise ValueError(msg) 

25 return self._process_clean(data) 

26 

27 def _process_clean(self, targets: Any) -> bool: 

28 success = True 

29 defaults = self._context.defaults().get(self._directive, {}) 

30 for target in targets: 

31 force = defaults.get("force", False) 

32 recursive = defaults.get("recursive", False) 

33 if isinstance(targets, dict) and isinstance(targets[target], dict): 

34 force = targets[target].get("force", force) 

35 recursive = targets[target].get("recursive", recursive) 

36 success &= self._clean(normslash(target), force=force, recursive=recursive) 

37 if success: 

38 self._log.info("All targets have been cleaned") 

39 else: 

40 self._log.error("Some targets were not successfully cleaned") 

41 return success 

42 

43 def _clean(self, target: str, *, force: bool, recursive: bool) -> bool: 

44 """ 

45 Cleans all the broken symbolic links in target if they point to 

46 a subdirectory of the base directory or if forced to clean. 

47 """ 

48 if not os.path.isdir(os.path.expandvars(os.path.expanduser(target))): 

49 self._log.debug(f"Ignoring nonexistent directory {target}") 

50 return True 

51 for item in os.listdir(os.path.expandvars(os.path.expanduser(target))): 

52 path = os.path.abspath(os.path.join(os.path.expandvars(os.path.expanduser(target)), item)) 

53 if recursive and os.path.isdir(path): 

54 # isdir implies not islink -- we don't want to descend into 

55 # symlinked directories. okay to do a recursive call here 

56 # because depth should be fairly limited 

57 self._clean(path, force=force, recursive=recursive) 

58 if not os.path.exists(path) and os.path.islink(path): 

59 points_at = os.path.join(os.path.dirname(path), os.readlink(path)) 

60 if sys.platform == "win32" and points_at.startswith("\\\\?\\"): 

61 points_at = points_at[4:] 

62 if self._in_directory(path, self._context.base_directory()) or force: 

63 if self._context.dry_run(): 

64 self._log.action(f"Would remove invalid link {path} -> {points_at}") 

65 else: 

66 self._log.action(f"Removing invalid link {path} -> {points_at}") 

67 os.remove(path) 

68 else: 

69 self._log.info(f"Link {path} -> {points_at} not removed.") 

70 return True 

71 

72 def _in_directory(self, path: str, directory: str) -> bool: 

73 """ 

74 Returns true if the path is in the directory. 

75 """ 

76 directory = os.path.join(os.path.realpath(directory), "") 

77 path = os.path.realpath(path) 

78 return os.path.commonprefix([path, directory]) == directory