Coverage for src / dotbot / plugins / link.py: 99%
253 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-29 12:13 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-29 12:13 -0800
1import glob
2import os
3import shutil
4import sys
5from datetime import datetime, timezone
6from typing import Any, List, Optional, Tuple
8from dotbot.plugin import Plugin
9from dotbot.util import shell_command
10from dotbot.util.common import normslash
13class Link(Plugin):
14 """
15 Symbolically links dotfiles.
16 """
18 supports_dry_run = True
20 _directive = "link"
22 def can_handle(self, directive: str) -> bool:
23 return directive == self._directive
25 def handle(self, directive: str, data: Any) -> bool:
26 if directive != self._directive:
27 msg = f"Link cannot handle directive {directive}"
28 raise ValueError(msg)
29 return self._process_links(data)
31 def _process_links(self, links: Any) -> bool:
32 success = True
33 defaults = self._context.defaults().get("link", {})
35 # Validate the default link type before looping.
36 link_type = defaults.get("type", "symlink")
37 if link_type not in {"symlink", "hardlink"}:
38 self._log.warning(f"The default link type is not recognized: '{link_type}'")
39 return False
41 for link_name, target in links.items():
42 link_name = os.path.expandvars(normslash(link_name)) # noqa: PLW2901
43 relative = defaults.get("relative", False)
44 # support old "canonicalize-path" key for compatibility
45 canonical_path = defaults.get("canonicalize", defaults.get("canonicalize-path", True))
46 link_type = defaults.get("type", "symlink")
47 force = defaults.get("force", False)
48 relink = defaults.get("relink", False)
49 create = defaults.get("create", False)
50 use_glob = defaults.get("glob", False)
51 backup = defaults.get("backup", False)
52 base_prefix = defaults.get("prefix", "")
53 test = defaults.get("if", None)
54 ignore_missing = defaults.get("ignore-missing", False)
55 exclude_paths = defaults.get("exclude", [])
56 if isinstance(target, dict):
57 # extended config
58 test = target.get("if", test)
59 relative = target.get("relative", relative)
60 canonical_path = target.get("canonicalize", target.get("canonicalize-path", canonical_path))
61 link_type = target.get("type", link_type)
62 if link_type not in {"symlink", "hardlink"}:
63 msg = f"The link type is not recognized: '{link_type}'"
64 self._log.warning(msg)
65 success = False
66 continue
67 force = target.get("force", force)
68 relink = target.get("relink", relink)
69 create = target.get("create", create)
70 use_glob = target.get("glob", use_glob)
71 backup = target.get("backup", backup)
72 base_prefix = target.get("prefix", base_prefix)
73 ignore_missing = target.get("ignore-missing", ignore_missing)
74 exclude_paths = target.get("exclude", exclude_paths)
75 path = self._default_target(link_name, target.get("path"))
76 else:
77 path = self._default_target(link_name, target)
78 path = normslash(path)
79 if test is not None and not self._test_success(test):
80 self._log.info(f"Skipping {link_name}")
81 continue
82 path = os.path.normpath(os.path.expandvars(os.path.expanduser(path)))
83 if use_glob and self._has_glob_chars(path):
84 glob_results = self._create_glob_results(path, exclude_paths)
85 self._log.debug(f"Globs from '{path}': {glob_results}")
86 for glob_full_item in glob_results:
87 # Find common dirname between pattern and the item:
88 glob_dirname = os.path.dirname(os.path.commonprefix([path, glob_full_item]))
89 glob_item = glob_full_item if len(glob_dirname) == 0 else glob_full_item[len(glob_dirname) + 1 :]
90 # Add prefix to basepath, if provided
91 if base_prefix:
92 glob_item = base_prefix + glob_item
93 # where is it going
94 glob_link_name = os.path.join(link_name, glob_item)
95 if create:
96 success &= self._create(glob_link_name)
97 did_backup = False
98 did_delete = False
99 if backup:
100 did_backup, backup_success = self._backup(glob_link_name)
101 success &= backup_success
102 # we only need to consider force/relink if we didn't do a backup
103 if (force or relink) and not (backup and backup_success):
104 did_delete, delete_success = self._delete(
105 glob_full_item,
106 glob_link_name,
107 relative=relative,
108 canonical_path=canonical_path,
109 force=force,
110 )
111 success &= delete_success
112 success &= self._link(
113 glob_full_item,
114 glob_link_name,
115 relative=relative,
116 canonical_path=canonical_path,
117 ignore_missing=ignore_missing,
118 link_type=link_type,
119 assume_gone=(did_backup or did_delete),
120 )
121 else:
122 if create:
123 success &= self._create(link_name)
124 if not ignore_missing and not self._exists(os.path.join(self._context.base_directory(), path)):
125 # we seemingly check this twice (here and in _link) because
126 # if the file doesn't exist and force is True, we don't
127 # want to remove the original (this is tested by test_link_force_leaves_when_nonexistent)
128 success = False
129 self._log.warning(f"Nonexistent target {link_name} -> {path}")
130 continue
131 did_backup = False
132 did_delete = False
133 if backup:
134 did_backup, backup_success = self._backup(link_name)
135 success &= backup_success
136 # we only need to consider force/relink if we didn't do a backup
137 if (force or relink) and not (backup and backup_success):
138 did_delete, delete_success = self._delete(
139 path, link_name, relative=relative, canonical_path=canonical_path, force=force
140 )
141 success &= delete_success
142 success &= self._link(
143 path,
144 link_name,
145 relative=relative,
146 canonical_path=canonical_path,
147 ignore_missing=ignore_missing,
148 link_type=link_type,
149 assume_gone=(did_backup or did_delete),
150 )
151 if success:
152 self._log.info("All links have been set up")
153 else:
154 self._log.error("Some links were not successfully set up")
155 return success
157 def _test_success(self, command: str) -> bool:
158 ret = shell_command(command, cwd=self._context.base_directory())
159 if ret != 0:
160 self._log.debug(f"Test '{command}' returned false")
161 return ret == 0
163 def _default_target(self, link_name: str, target: Optional[str]) -> str:
164 if target is None:
165 basename = os.path.basename(link_name)
166 if basename.startswith("."):
167 return basename[1:]
168 return basename
169 return target
171 def _has_glob_chars(self, path: str) -> bool:
172 return any(i in path for i in "?*[")
174 def _glob(self, path: str) -> List[str]:
175 """
176 Wrap `glob.glob` in a python agnostic way, catching errors in usage.
177 """
178 found = glob.glob(path, recursive=True)
179 # normalize paths to ensure cross-platform compatibility
180 found = [os.path.normpath(p) for p in found]
181 # if using recursive glob (`**`), filter results to return only files:
182 if "**" in path and not path.endswith(str(os.sep)):
183 self._log.debug("Excluding directories from recursive glob: " + str(path))
184 found = [f for f in found if os.path.isfile(f)]
185 # return matched results
186 return found
188 def _create_glob_results(self, path: str, exclude_paths: List[str]) -> List[str]:
189 self._log.debug("Globbing with pattern: " + str(path))
190 include = self._glob(path)
191 self._log.debug("Glob found : " + str(include))
192 # filter out any paths matching the exclude globs:
193 exclude = []
194 for expat in exclude_paths:
195 self._log.debug("Excluding globs with pattern: " + str(expat))
196 exclude.extend(self._glob(expat))
197 self._log.debug("Excluded globs from '" + path + "': " + str(exclude))
198 ret = set(include) - set(exclude)
199 return list(ret)
201 def _is_link(self, path: str) -> bool:
202 """
203 Returns true if the path is a symbolic link.
204 """
205 return os.path.islink(os.path.expanduser(path))
207 def _link_target(self, path: str) -> str:
208 """
209 Returns the target of the symbolic link.
210 """
211 path = os.path.expanduser(path)
212 path = os.readlink(path)
213 if sys.platform == "win32" and path.startswith("\\\\?\\"):
214 path = path[4:]
215 return path
217 def _exists(self, path: str) -> bool:
218 """
219 Returns true if the path exists.
220 """
221 path = os.path.expanduser(path)
222 return os.path.exists(path)
224 def _lexists(self, path: str) -> bool:
225 """
226 Returns true if the path exists (including broken symlinks).
227 """
228 path = os.path.expanduser(path)
229 return os.path.lexists(path)
231 def _create(self, path: str) -> bool:
232 success = True
233 parent = os.path.abspath(os.path.join(os.path.expanduser(path), os.pardir))
234 if not self._exists(parent):
235 self._log.debug(f"Try to create parent: {parent}")
236 if self._context.dry_run():
237 self._log.action(f"Would create directory {parent}")
238 return True
239 try:
240 os.makedirs(parent)
241 except OSError as e:
242 self._log.warning(f"Failed to create directory {parent}")
243 self._log.debug(f"OSError: {e!s}")
244 success = False
245 else:
246 self._log.action(f"Creating directory {parent}")
247 return success
249 def _backup(self, path: str) -> Tuple[bool, bool]:
250 if self._exists(path) and not self._is_link(path):
251 file_to_backup = os.path.abspath(os.path.expanduser(path)) # removes trailing slash if any
252 timestamp = datetime.now(timezone.utc).astimezone().strftime("%Y%m%d-%H%M%S")
253 backup_path = f"{file_to_backup}.dotbot-backup.{timestamp}"
254 self._log.debug(f"Try to backup file {file_to_backup} to {backup_path}")
255 if self._context.dry_run():
256 self._log.action(f"Would backup {file_to_backup} to {backup_path}")
257 return True, True
258 try:
259 os.rename(file_to_backup, backup_path)
260 except OSError as e:
261 self._log.warning(f"Failed to backup file {file_to_backup} to {backup_path}")
262 self._log.debug(f"OSError: {e!s}")
263 return False, False
264 else:
265 self._log.action(f"Backed up file {file_to_backup} to {backup_path}")
266 return True, True
267 return False, True
269 def _delete(
270 self, target: str, path: str, *, relative: bool, canonical_path: bool, force: bool
271 ) -> Tuple[bool, bool]:
272 success = True
273 removed = False
274 target = os.path.join(self._context.base_directory(canonical_path=canonical_path), target)
275 fullpath = os.path.abspath(os.path.expanduser(path))
276 if self._exists(path) and not self._is_link(path) and os.path.realpath(fullpath) == target:
277 # Special case: The path is not a symlink but resolves to the target anyway.
278 # Deleting the path would actually delete the target.
279 # This may happen if a parent directory is a symlink.
280 self._log.warning(f"{path} appears to be the same file as {target}.")
281 return False, False
282 if relative:
283 target = self._relative_path(target, fullpath)
284 if (self._is_link(path) and self._link_target(path) != target) or (
285 self._lexists(path) and not self._is_link(path)
286 ):
287 if self._context.dry_run():
288 self._log.action(f"Would remove {path}")
289 removed = True
290 else:
291 try:
292 if os.path.islink(fullpath):
293 os.unlink(fullpath)
294 removed = True
295 elif force:
296 if os.path.isdir(fullpath):
297 shutil.rmtree(fullpath)
298 removed = True
299 else:
300 os.remove(fullpath)
301 removed = True
302 except OSError as e:
303 self._log.warning(f"Failed to remove {path}")
304 self._log.debug(f"OSError: {e!s}")
305 success = False
306 else:
307 if removed:
308 self._log.action(f"Removing {path}")
309 return removed, success
311 def _relative_path(self, target: str, link_name: str) -> str:
312 """
313 Returns the relative path to get to the target file from the
314 link location.
315 """
316 link_dir = os.path.dirname(link_name)
317 return os.path.relpath(target, link_dir)
319 def _link(
320 self,
321 target: str,
322 link_name: str,
323 *,
324 relative: bool,
325 canonical_path: bool,
326 ignore_missing: bool,
327 link_type: str,
328 assume_gone: bool,
329 ) -> bool:
330 """
331 Links link_name to target.
333 The caller must ensure that the target exists.
335 Returns true if successfully linked files.
336 """
338 link_path = os.path.abspath(os.path.expanduser(link_name))
339 base_directory = self._context.base_directory(canonical_path=canonical_path)
340 absolute_target = os.path.join(base_directory, target)
341 link_name = os.path.normpath(link_name)
342 target_path = self._relative_path(absolute_target, link_path) if relative else absolute_target
344 # we need to use absolute_target below because our cwd is the dotfiles
345 # directory, and if target_path is relative, it will be relative to the
346 # link directory
347 if ((not self._lexists(link_name)) or (self._context.dry_run() and assume_gone)) and (
348 ignore_missing or self._exists(absolute_target)
349 ):
350 if self._context.dry_run():
351 self._log.action(f"Would create {link_type} {link_name} -> {target_path}")
352 return True
353 try:
354 if link_type == "symlink":
355 os.symlink(target_path, link_path)
356 else: # link_type == "hardlink"
357 os.link(absolute_target, link_path)
358 except OSError as e:
359 self._log.warning(f"Linking failed {link_name} -> {target_path}")
360 self._log.debug(f"OSError: {e!s}")
361 return False
362 else:
363 self._log.action(f"Creating {link_type} {link_name} -> {target_path}")
364 return True
366 # Failure case: The link name exists and is a symlink
367 if self._is_link(link_name):
368 if link_type == "symlink":
369 if self._link_target(link_name) == target_path:
370 # Idempotent case: The configured symlink already exists
371 self._log.info(f"Link exists {link_name} -> {target_path}")
372 return True
374 # The existing symlink isn't pointing at the target.
375 # Distinguish between an incorrect symlink and a broken ("invalid") symlink.
376 terminology = "Incorrect" if self._exists(link_name) else "Invalid"
377 self._log.warning(f"{terminology} link {link_name} -> {self._link_target(link_name)}")
378 return False
380 self._log.warning(f"{link_name} already exists but is a symbolic link, not a hard link")
381 return False
383 # Failure case: The link name exists
384 if link_type == "hardlink" and os.stat(link_path).st_ino == os.stat(absolute_target).st_ino:
385 # Idempotent case: The configured hardlink already exists
386 self._log.info(f"Link exists {link_name} -> {target_path}")
387 return True
389 self._log.warning(f"{link_name} already exists but is a regular file or directory")
390 return False