Coverage for src / dotbot / plugins / link.py: 99%

253 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-29 12:13 -0800

1import glob 

2import os 

3import shutil 

4import sys 

5from datetime import datetime, timezone 

6from typing import Any, List, Optional, Tuple 

7 

8from dotbot.plugin import Plugin 

9from dotbot.util import shell_command 

10from dotbot.util.common import normslash 

11 

12 

13class Link(Plugin): 

14 """ 

15 Symbolically links dotfiles. 

16 """ 

17 

18 supports_dry_run = True 

19 

20 _directive = "link" 

21 

22 def can_handle(self, directive: str) -> bool: 

23 return directive == self._directive 

24 

25 def handle(self, directive: str, data: Any) -> bool: 

26 if directive != self._directive: 

27 msg = f"Link cannot handle directive {directive}" 

28 raise ValueError(msg) 

29 return self._process_links(data) 

30 

31 def _process_links(self, links: Any) -> bool: 

32 success = True 

33 defaults = self._context.defaults().get("link", {}) 

34 

35 # Validate the default link type before looping. 

36 link_type = defaults.get("type", "symlink") 

37 if link_type not in {"symlink", "hardlink"}: 

38 self._log.warning(f"The default link type is not recognized: '{link_type}'") 

39 return False 

40 

41 for link_name, target in links.items(): 

42 link_name = os.path.expandvars(normslash(link_name)) # noqa: PLW2901 

43 relative = defaults.get("relative", False) 

44 # support old "canonicalize-path" key for compatibility 

45 canonical_path = defaults.get("canonicalize", defaults.get("canonicalize-path", True)) 

46 link_type = defaults.get("type", "symlink") 

47 force = defaults.get("force", False) 

48 relink = defaults.get("relink", False) 

49 create = defaults.get("create", False) 

50 use_glob = defaults.get("glob", False) 

51 backup = defaults.get("backup", False) 

52 base_prefix = defaults.get("prefix", "") 

53 test = defaults.get("if", None) 

54 ignore_missing = defaults.get("ignore-missing", False) 

55 exclude_paths = defaults.get("exclude", []) 

56 if isinstance(target, dict): 

57 # extended config 

58 test = target.get("if", test) 

59 relative = target.get("relative", relative) 

60 canonical_path = target.get("canonicalize", target.get("canonicalize-path", canonical_path)) 

61 link_type = target.get("type", link_type) 

62 if link_type not in {"symlink", "hardlink"}: 

63 msg = f"The link type is not recognized: '{link_type}'" 

64 self._log.warning(msg) 

65 success = False 

66 continue 

67 force = target.get("force", force) 

68 relink = target.get("relink", relink) 

69 create = target.get("create", create) 

70 use_glob = target.get("glob", use_glob) 

71 backup = target.get("backup", backup) 

72 base_prefix = target.get("prefix", base_prefix) 

73 ignore_missing = target.get("ignore-missing", ignore_missing) 

74 exclude_paths = target.get("exclude", exclude_paths) 

75 path = self._default_target(link_name, target.get("path")) 

76 else: 

77 path = self._default_target(link_name, target) 

78 path = normslash(path) 

79 if test is not None and not self._test_success(test): 

80 self._log.info(f"Skipping {link_name}") 

81 continue 

82 path = os.path.normpath(os.path.expandvars(os.path.expanduser(path))) 

83 if use_glob and self._has_glob_chars(path): 

84 glob_results = self._create_glob_results(path, exclude_paths) 

85 self._log.debug(f"Globs from '{path}': {glob_results}") 

86 for glob_full_item in glob_results: 

87 # Find common dirname between pattern and the item: 

88 glob_dirname = os.path.dirname(os.path.commonprefix([path, glob_full_item])) 

89 glob_item = glob_full_item if len(glob_dirname) == 0 else glob_full_item[len(glob_dirname) + 1 :] 

90 # Add prefix to basepath, if provided 

91 if base_prefix: 

92 glob_item = base_prefix + glob_item 

93 # where is it going 

94 glob_link_name = os.path.join(link_name, glob_item) 

95 if create: 

96 success &= self._create(glob_link_name) 

97 did_backup = False 

98 did_delete = False 

99 if backup: 

100 did_backup, backup_success = self._backup(glob_link_name) 

101 success &= backup_success 

102 # we only need to consider force/relink if we didn't do a backup 

103 if (force or relink) and not (backup and backup_success): 

104 did_delete, delete_success = self._delete( 

105 glob_full_item, 

106 glob_link_name, 

107 relative=relative, 

108 canonical_path=canonical_path, 

109 force=force, 

110 ) 

111 success &= delete_success 

112 success &= self._link( 

113 glob_full_item, 

114 glob_link_name, 

115 relative=relative, 

116 canonical_path=canonical_path, 

117 ignore_missing=ignore_missing, 

118 link_type=link_type, 

119 assume_gone=(did_backup or did_delete), 

120 ) 

121 else: 

122 if create: 

123 success &= self._create(link_name) 

124 if not ignore_missing and not self._exists(os.path.join(self._context.base_directory(), path)): 

125 # we seemingly check this twice (here and in _link) because 

126 # if the file doesn't exist and force is True, we don't 

127 # want to remove the original (this is tested by test_link_force_leaves_when_nonexistent) 

128 success = False 

129 self._log.warning(f"Nonexistent target {link_name} -> {path}") 

130 continue 

131 did_backup = False 

132 did_delete = False 

133 if backup: 

134 did_backup, backup_success = self._backup(link_name) 

135 success &= backup_success 

136 # we only need to consider force/relink if we didn't do a backup 

137 if (force or relink) and not (backup and backup_success): 

138 did_delete, delete_success = self._delete( 

139 path, link_name, relative=relative, canonical_path=canonical_path, force=force 

140 ) 

141 success &= delete_success 

142 success &= self._link( 

143 path, 

144 link_name, 

145 relative=relative, 

146 canonical_path=canonical_path, 

147 ignore_missing=ignore_missing, 

148 link_type=link_type, 

149 assume_gone=(did_backup or did_delete), 

150 ) 

151 if success: 

152 self._log.info("All links have been set up") 

153 else: 

154 self._log.error("Some links were not successfully set up") 

155 return success 

156 

157 def _test_success(self, command: str) -> bool: 

158 ret = shell_command(command, cwd=self._context.base_directory()) 

159 if ret != 0: 

160 self._log.debug(f"Test '{command}' returned false") 

161 return ret == 0 

162 

163 def _default_target(self, link_name: str, target: Optional[str]) -> str: 

164 if target is None: 

165 basename = os.path.basename(link_name) 

166 if basename.startswith("."): 

167 return basename[1:] 

168 return basename 

169 return target 

170 

171 def _has_glob_chars(self, path: str) -> bool: 

172 return any(i in path for i in "?*[") 

173 

174 def _glob(self, path: str) -> List[str]: 

175 """ 

176 Wrap `glob.glob` in a python agnostic way, catching errors in usage. 

177 """ 

178 found = glob.glob(path, recursive=True) 

179 # normalize paths to ensure cross-platform compatibility 

180 found = [os.path.normpath(p) for p in found] 

181 # if using recursive glob (`**`), filter results to return only files: 

182 if "**" in path and not path.endswith(str(os.sep)): 

183 self._log.debug("Excluding directories from recursive glob: " + str(path)) 

184 found = [f for f in found if os.path.isfile(f)] 

185 # return matched results 

186 return found 

187 

188 def _create_glob_results(self, path: str, exclude_paths: List[str]) -> List[str]: 

189 self._log.debug("Globbing with pattern: " + str(path)) 

190 include = self._glob(path) 

191 self._log.debug("Glob found : " + str(include)) 

192 # filter out any paths matching the exclude globs: 

193 exclude = [] 

194 for expat in exclude_paths: 

195 self._log.debug("Excluding globs with pattern: " + str(expat)) 

196 exclude.extend(self._glob(expat)) 

197 self._log.debug("Excluded globs from '" + path + "': " + str(exclude)) 

198 ret = set(include) - set(exclude) 

199 return list(ret) 

200 

201 def _is_link(self, path: str) -> bool: 

202 """ 

203 Returns true if the path is a symbolic link. 

204 """ 

205 return os.path.islink(os.path.expanduser(path)) 

206 

207 def _link_target(self, path: str) -> str: 

208 """ 

209 Returns the target of the symbolic link. 

210 """ 

211 path = os.path.expanduser(path) 

212 path = os.readlink(path) 

213 if sys.platform == "win32" and path.startswith("\\\\?\\"): 

214 path = path[4:] 

215 return path 

216 

217 def _exists(self, path: str) -> bool: 

218 """ 

219 Returns true if the path exists. 

220 """ 

221 path = os.path.expanduser(path) 

222 return os.path.exists(path) 

223 

224 def _lexists(self, path: str) -> bool: 

225 """ 

226 Returns true if the path exists (including broken symlinks). 

227 """ 

228 path = os.path.expanduser(path) 

229 return os.path.lexists(path) 

230 

231 def _create(self, path: str) -> bool: 

232 success = True 

233 parent = os.path.abspath(os.path.join(os.path.expanduser(path), os.pardir)) 

234 if not self._exists(parent): 

235 self._log.debug(f"Try to create parent: {parent}") 

236 if self._context.dry_run(): 

237 self._log.action(f"Would create directory {parent}") 

238 return True 

239 try: 

240 os.makedirs(parent) 

241 except OSError as e: 

242 self._log.warning(f"Failed to create directory {parent}") 

243 self._log.debug(f"OSError: {e!s}") 

244 success = False 

245 else: 

246 self._log.action(f"Creating directory {parent}") 

247 return success 

248 

249 def _backup(self, path: str) -> Tuple[bool, bool]: 

250 if self._exists(path) and not self._is_link(path): 

251 file_to_backup = os.path.abspath(os.path.expanduser(path)) # removes trailing slash if any 

252 timestamp = datetime.now(timezone.utc).astimezone().strftime("%Y%m%d-%H%M%S") 

253 backup_path = f"{file_to_backup}.dotbot-backup.{timestamp}" 

254 self._log.debug(f"Try to backup file {file_to_backup} to {backup_path}") 

255 if self._context.dry_run(): 

256 self._log.action(f"Would backup {file_to_backup} to {backup_path}") 

257 return True, True 

258 try: 

259 os.rename(file_to_backup, backup_path) 

260 except OSError as e: 

261 self._log.warning(f"Failed to backup file {file_to_backup} to {backup_path}") 

262 self._log.debug(f"OSError: {e!s}") 

263 return False, False 

264 else: 

265 self._log.action(f"Backed up file {file_to_backup} to {backup_path}") 

266 return True, True 

267 return False, True 

268 

269 def _delete( 

270 self, target: str, path: str, *, relative: bool, canonical_path: bool, force: bool 

271 ) -> Tuple[bool, bool]: 

272 success = True 

273 removed = False 

274 target = os.path.join(self._context.base_directory(canonical_path=canonical_path), target) 

275 fullpath = os.path.abspath(os.path.expanduser(path)) 

276 if self._exists(path) and not self._is_link(path) and os.path.realpath(fullpath) == target: 

277 # Special case: The path is not a symlink but resolves to the target anyway. 

278 # Deleting the path would actually delete the target. 

279 # This may happen if a parent directory is a symlink. 

280 self._log.warning(f"{path} appears to be the same file as {target}.") 

281 return False, False 

282 if relative: 

283 target = self._relative_path(target, fullpath) 

284 if (self._is_link(path) and self._link_target(path) != target) or ( 

285 self._lexists(path) and not self._is_link(path) 

286 ): 

287 if self._context.dry_run(): 

288 self._log.action(f"Would remove {path}") 

289 removed = True 

290 else: 

291 try: 

292 if os.path.islink(fullpath): 

293 os.unlink(fullpath) 

294 removed = True 

295 elif force: 

296 if os.path.isdir(fullpath): 

297 shutil.rmtree(fullpath) 

298 removed = True 

299 else: 

300 os.remove(fullpath) 

301 removed = True 

302 except OSError as e: 

303 self._log.warning(f"Failed to remove {path}") 

304 self._log.debug(f"OSError: {e!s}") 

305 success = False 

306 else: 

307 if removed: 

308 self._log.action(f"Removing {path}") 

309 return removed, success 

310 

311 def _relative_path(self, target: str, link_name: str) -> str: 

312 """ 

313 Returns the relative path to get to the target file from the 

314 link location. 

315 """ 

316 link_dir = os.path.dirname(link_name) 

317 return os.path.relpath(target, link_dir) 

318 

319 def _link( 

320 self, 

321 target: str, 

322 link_name: str, 

323 *, 

324 relative: bool, 

325 canonical_path: bool, 

326 ignore_missing: bool, 

327 link_type: str, 

328 assume_gone: bool, 

329 ) -> bool: 

330 """ 

331 Links link_name to target. 

332 

333 The caller must ensure that the target exists. 

334 

335 Returns true if successfully linked files. 

336 """ 

337 

338 link_path = os.path.abspath(os.path.expanduser(link_name)) 

339 base_directory = self._context.base_directory(canonical_path=canonical_path) 

340 absolute_target = os.path.join(base_directory, target) 

341 link_name = os.path.normpath(link_name) 

342 target_path = self._relative_path(absolute_target, link_path) if relative else absolute_target 

343 

344 # we need to use absolute_target below because our cwd is the dotfiles 

345 # directory, and if target_path is relative, it will be relative to the 

346 # link directory 

347 if ((not self._lexists(link_name)) or (self._context.dry_run() and assume_gone)) and ( 

348 ignore_missing or self._exists(absolute_target) 

349 ): 

350 if self._context.dry_run(): 

351 self._log.action(f"Would create {link_type} {link_name} -> {target_path}") 

352 return True 

353 try: 

354 if link_type == "symlink": 

355 os.symlink(target_path, link_path) 

356 else: # link_type == "hardlink" 

357 os.link(absolute_target, link_path) 

358 except OSError as e: 

359 self._log.warning(f"Linking failed {link_name} -> {target_path}") 

360 self._log.debug(f"OSError: {e!s}") 

361 return False 

362 else: 

363 self._log.action(f"Creating {link_type} {link_name} -> {target_path}") 

364 return True 

365 

366 # Failure case: The link name exists and is a symlink 

367 if self._is_link(link_name): 

368 if link_type == "symlink": 

369 if self._link_target(link_name) == target_path: 

370 # Idempotent case: The configured symlink already exists 

371 self._log.info(f"Link exists {link_name} -> {target_path}") 

372 return True 

373 

374 # The existing symlink isn't pointing at the target. 

375 # Distinguish between an incorrect symlink and a broken ("invalid") symlink. 

376 terminology = "Incorrect" if self._exists(link_name) else "Invalid" 

377 self._log.warning(f"{terminology} link {link_name} -> {self._link_target(link_name)}") 

378 return False 

379 

380 self._log.warning(f"{link_name} already exists but is a symbolic link, not a hard link") 

381 return False 

382 

383 # Failure case: The link name exists 

384 if link_type == "hardlink" and os.stat(link_path).st_ino == os.stat(absolute_target).st_ino: 

385 # Idempotent case: The configured hardlink already exists 

386 self._log.info(f"Link exists {link_name} -> {target_path}") 

387 return True 

388 

389 self._log.warning(f"{link_name} already exists but is a regular file or directory") 

390 return False