Coverage for fastblocks / actions / sync / settings.py: 36%
359 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
1"""Settings file synchronization between filesystem and cloud storage.
3Settings sync is intentionally limited to filesystem and cloud storage only.
4Unlike templates, settings are not cached for security and consistency reasons.
5"""
7import typing as t
8from pathlib import Path
10import yaml
11from acb.actions.hash import hash
12from acb.debug import debug
13from anyio import Path as AsyncPath
15from .strategies import (
16 ConflictStrategy,
17 SyncDirection,
18 SyncResult,
19 SyncStrategy,
20 create_backup,
21 get_file_info,
22 resolve_conflict,
23 should_sync,
24)
27class SettingsSyncResult(SyncResult):
28 def __init__(
29 self,
30 *,
31 config_reloaded: list[str] | None = None,
32 adapters_affected: list[str] | None = None,
33 **kwargs: t.Any,
34 ) -> None:
35 super().__init__(**kwargs)
36 self.config_reloaded = config_reloaded if config_reloaded is not None else []
37 self.adapters_affected = (
38 adapters_affected if adapters_affected is not None else []
39 )
42async def sync_settings(
43 *,
44 settings_path: AsyncPath | None = None,
45 adapter_names: list[str] | None = None,
46 strategy: SyncStrategy | None = None,
47 storage_bucket: str | None = None,
48 reload_config: bool = True,
49) -> SettingsSyncResult:
50 config = _prepare_settings_sync_config(settings_path, strategy)
51 result = SettingsSyncResult()
53 if storage_bucket is None:
54 storage_bucket = await _get_default_settings_bucket()
56 storage = await _initialize_storage_only(result)
57 if not storage:
58 return result
60 settings_files = await _discover_settings_files(
61 config["settings_path"],
62 adapter_names,
63 )
64 if not settings_files:
65 debug("No settings files found to sync")
66 return result
68 debug(f"Found {len(settings_files)} settings files to sync")
70 await _sync_settings_files(
71 settings_files,
72 storage,
73 config["strategy"],
74 storage_bucket,
75 result,
76 )
78 await _handle_config_reload(reload_config, result)
80 debug(
81 f"Settings sync completed: {len(result.synced_items)} synced, {len(result.conflicts)} conflicts",
82 )
84 return result
87def _prepare_settings_sync_config(
88 settings_path: AsyncPath | None,
89 strategy: SyncStrategy | None,
90) -> dict[str, t.Any]:
91 return {
92 "settings_path": settings_path or AsyncPath("settings"),
93 "strategy": strategy or SyncStrategy(),
94 }
97async def _initialize_storage_only(result: SettingsSyncResult) -> t.Any | None:
98 try:
99 from acb.depends import depends
101 storage = await depends.get("storage")
102 if not storage:
103 result.errors.append(Exception("Storage adapter not available"))
104 return None
106 return storage
107 except Exception as e:
108 result.errors.append(e)
109 return None
112async def _get_default_settings_bucket() -> str:
113 try:
114 storage_config_path = AsyncPath("settings/storage.yml")
115 if await storage_config_path.exists():
116 content = await storage_config_path.read_text()
117 config = yaml.safe_load(content)
118 if isinstance(config, dict):
119 bucket_name = t.cast(
120 str, config.get("buckets", {}).get("settings", "settings")
121 )
122 else:
123 bucket_name = "settings"
124 debug(f"Using settings bucket from config: {bucket_name}")
125 return bucket_name
126 except Exception as e:
127 debug(f"Could not load storage config, using default: {e}")
128 debug("Using fallback settings bucket: settings")
129 return "settings"
132async def _sync_settings_files(
133 settings_files: list[dict[str, t.Any]],
134 storage: t.Any,
135 strategy: SyncStrategy,
136 storage_bucket: str,
137 result: SettingsSyncResult,
138) -> None:
139 for settings_info in settings_files:
140 try:
141 file_result = await _sync_single_settings_file(
142 settings_info,
143 storage,
144 strategy,
145 storage_bucket,
146 )
147 _accumulate_settings_sync_results(file_result, result)
149 except Exception as e:
150 result.errors.append(e)
151 debug(f"Error syncing settings {settings_info['relative_path']}: {e}")
154def _accumulate_settings_sync_results(
155 file_result: dict[str, t.Any],
156 result: SettingsSyncResult,
157) -> None:
158 if file_result.get("synced"):
159 result.synced_items.extend(file_result["synced"])
160 result.adapters_affected.extend(file_result.get("adapters_affected", []))
161 if file_result.get("conflicts"):
162 result.conflicts.extend(file_result["conflicts"])
163 if file_result.get("errors"):
164 result.errors.extend(file_result["errors"])
165 if file_result.get("skipped"):
166 result.skipped.extend(file_result["skipped"])
167 if file_result.get("backed_up"):
168 result.backed_up.extend(file_result["backed_up"])
171async def _handle_config_reload(
172 reload_config: bool,
173 result: SettingsSyncResult,
174) -> None:
175 if reload_config and result.synced_items:
176 try:
177 await _reload_configuration(result.adapters_affected)
178 result.config_reloaded = result.adapters_affected.copy()
179 except Exception as e:
180 result.errors.append(e)
181 debug(f"Error reloading configuration: {e}")
184async def _discover_settings_files(
185 settings_path: AsyncPath,
186 adapter_names: list[str] | None = None,
187) -> list[dict[str, t.Any]]:
188 settings_files: list[dict[str, t.Any]] = []
190 if not await settings_path.exists():
191 debug(f"Settings path does not exist: {settings_path}")
192 return settings_files
194 for pattern in ("*.yml", "*.yaml"):
195 await _discover_files_with_pattern(
196 settings_path,
197 pattern,
198 adapter_names,
199 settings_files,
200 )
202 return settings_files
205async def _discover_files_with_pattern(
206 settings_path: AsyncPath,
207 pattern: str,
208 adapter_names: list[str] | None,
209 settings_files: list[dict[str, t.Any]],
210) -> None:
211 async for file_path in settings_path.rglob(pattern):
212 if await file_path.is_file():
213 await _process_settings_file(
214 file_path,
215 settings_path,
216 adapter_names,
217 settings_files,
218 )
221async def _process_settings_file(
222 file_path: AsyncPath,
223 settings_path: AsyncPath,
224 adapter_names: list[str] | None,
225 settings_files: list[dict[str, t.Any]],
226) -> None:
227 adapter_name = file_path.stem
229 if adapter_names and adapter_name not in adapter_names:
230 return
232 try:
233 rel_path = file_path.relative_to(settings_path)
234 settings_files.append(
235 {
236 "local_path": file_path,
237 "relative_path": rel_path,
238 "storage_path": str(rel_path),
239 "adapter_name": adapter_name,
240 },
241 )
242 except ValueError:
243 debug(f"Could not get relative path for {file_path}")
246async def _sync_single_settings_file(
247 settings_info: dict[str, t.Any],
248 storage: t.Any,
249 strategy: SyncStrategy,
250 storage_bucket: str,
251) -> dict[str, t.Any]:
252 local_path = settings_info["local_path"]
253 storage_path = settings_info["storage_path"]
254 adapter_name = settings_info["adapter_name"]
256 result = _create_sync_result()
258 try:
259 local_info, remote_info = await _get_file_infos(
260 local_path,
261 storage,
262 storage_bucket,
263 storage_path,
264 )
266 if not await _should_sync_file(
267 local_info,
268 remote_info,
269 strategy,
270 storage_path,
271 result,
272 ):
273 return result
275 if not await _validate_local_yaml(local_info, storage_path, result):
276 return result
278 await _execute_sync_operation(
279 local_path,
280 storage,
281 storage_bucket,
282 storage_path,
283 local_info,
284 remote_info,
285 strategy,
286 result,
287 )
289 if result["synced"]:
290 result["adapters_affected"].append(adapter_name)
292 except Exception as e:
293 result["errors"].append(e)
294 debug(f"Error in _sync_single_settings_file for {storage_path}: {e}")
296 return result
299def _create_sync_result() -> dict[str, t.Any]:
300 return {
301 "synced": [],
302 "conflicts": [],
303 "errors": [],
304 "skipped": [],
305 "backed_up": [],
306 "adapters_affected": [],
307 }
310async def _get_file_infos(
311 local_path: t.Any,
312 storage: t.Any,
313 storage_bucket: str,
314 storage_path: str,
315) -> tuple[dict[str, t.Any], dict[str, t.Any]]:
316 local_info = await get_file_info(Path(local_path))
317 remote_info = await _get_storage_file_info(storage, storage_bucket, storage_path)
318 return local_info, remote_info
321async def _should_sync_file(
322 local_info: dict[str, t.Any],
323 remote_info: dict[str, t.Any],
324 strategy: SyncStrategy,
325 storage_path: str,
326 result: dict[str, t.Any],
327) -> bool:
328 sync_needed, reason = should_sync(local_info, remote_info, strategy.direction)
329 if not sync_needed:
330 result["skipped"].append(f"{storage_path} ({reason})")
331 return False
333 debug(f"Syncing settings {storage_path}: {reason}")
334 return True
337async def _validate_local_yaml(
338 local_info: dict[str, t.Any],
339 storage_path: str,
340 result: dict[str, t.Any],
341) -> bool:
342 if local_info["exists"]:
343 try:
344 await _validate_yaml_content(local_info["content"])
345 except Exception as e:
346 result["errors"].append(f"Invalid YAML in {storage_path}: {e}")
347 return False
348 return True
351async def _execute_sync_operation(
352 local_path: t.Any,
353 storage: t.Any,
354 storage_bucket: str,
355 storage_path: str,
356 local_info: dict[str, t.Any],
357 remote_info: dict[str, t.Any],
358 strategy: SyncStrategy,
359 result: dict[str, t.Any],
360) -> None:
361 if _should_pull_settings(strategy, local_info, remote_info):
362 await _pull_settings(
363 local_path,
364 storage,
365 storage_bucket,
366 storage_path,
367 strategy,
368 result,
369 )
370 elif _should_push_settings(strategy, local_info, remote_info):
371 await _push_settings(
372 local_path,
373 storage,
374 storage_bucket,
375 storage_path,
376 strategy,
377 result,
378 )
379 elif _has_bidirectional_conflict(strategy, local_info, remote_info):
380 await _handle_settings_conflict(
381 local_path,
382 storage,
383 storage_bucket,
384 storage_path,
385 local_info,
386 remote_info,
387 strategy,
388 result,
389 )
392def _should_pull_settings(
393 strategy: SyncStrategy,
394 local_info: dict[str, t.Any],
395 remote_info: dict[str, t.Any],
396) -> bool:
397 return strategy.direction == SyncDirection.PULL or (
398 strategy.direction == SyncDirection.BIDIRECTIONAL
399 and remote_info["exists"]
400 and (not local_info["exists"] or remote_info["mtime"] > local_info["mtime"])
401 )
404def _should_push_settings(
405 strategy: SyncStrategy,
406 local_info: dict[str, t.Any],
407 remote_info: dict[str, t.Any],
408) -> bool:
409 return strategy.direction == SyncDirection.PUSH or (
410 strategy.direction == SyncDirection.BIDIRECTIONAL
411 and local_info["exists"]
412 and (not remote_info["exists"] or local_info["mtime"] > remote_info["mtime"])
413 )
416def _has_bidirectional_conflict(
417 strategy: SyncStrategy,
418 local_info: dict[str, t.Any],
419 remote_info: dict[str, t.Any],
420) -> bool:
421 return (
422 strategy.direction == SyncDirection.BIDIRECTIONAL
423 and local_info["exists"]
424 and remote_info["exists"]
425 )
428async def _get_storage_file_info(
429 storage: t.Any,
430 bucket: str,
431 file_path: str,
432) -> dict[str, t.Any]:
433 try:
434 bucket_obj = getattr(storage, bucket, None)
436 if not bucket_obj:
437 await storage._create_bucket(bucket)
438 bucket_obj = getattr(storage, bucket)
440 exists = await bucket_obj.exists(file_path)
442 if not exists:
443 return {
444 "exists": False,
445 "size": 0,
446 "mtime": 0,
447 "content_hash": None,
448 }
450 content = await bucket_obj.read(file_path)
451 metadata = await bucket_obj.stat(file_path)
453 # ACB's Blake3 is 10x faster than Blake2b for cryptographic hashing
454 content_hash = await hash.blake3(content)
456 return {
457 "exists": True,
458 "size": len(content),
459 "mtime": metadata.get("mtime", 0),
460 "content_hash": content_hash,
461 "content": content,
462 }
464 except Exception as e:
465 debug(f"Error getting storage file info for {file_path}: {e}")
466 return {
467 "exists": False,
468 "size": 0,
469 "mtime": 0,
470 "content_hash": None,
471 "error": str(e),
472 }
475async def _validate_yaml_content(content: bytes) -> None:
476 try:
477 import yaml
479 yaml.safe_load(content.decode())
480 except Exception as e:
481 msg = f"Invalid YAML content: {e}"
482 raise ValueError(msg)
485async def _pull_settings(
486 local_path: AsyncPath,
487 storage: t.Any,
488 bucket: str,
489 storage_path: str,
490 strategy: SyncStrategy,
491 result: dict[str, t.Any],
492) -> None:
493 try:
494 bucket_obj = getattr(storage, bucket)
496 if strategy.dry_run:
497 debug(f"DRY RUN: Would pull {storage_path} to {local_path}")
498 result["synced"].append(f"PULL(dry-run): {storage_path}")
499 return
501 if await local_path.exists() and strategy.backup_on_conflict:
502 backup_path = await create_backup(Path(local_path))
503 result["backed_up"].append(str(backup_path))
505 content = await bucket_obj.read(storage_path)
507 await _validate_yaml_content(content)
509 await local_path.parent.mkdir(parents=True, exist_ok=True)
511 await local_path.write_bytes(content)
513 result["synced"].append(f"PULL: {storage_path}")
514 debug(f"Pulled settings from storage: {storage_path}")
516 except Exception as e:
517 result["errors"].append(e)
518 debug(f"Error pulling settings {storage_path}: {e}")
521async def _push_settings(
522 local_path: AsyncPath,
523 storage: t.Any,
524 bucket: str,
525 storage_path: str,
526 strategy: SyncStrategy,
527 result: dict[str, t.Any],
528) -> None:
529 try:
530 bucket_obj = getattr(storage, bucket)
532 if strategy.dry_run:
533 debug(f"DRY RUN: Would push {local_path} to {storage_path}")
534 result["synced"].append(f"PUSH(dry-run): {storage_path}")
535 return
537 content = await local_path.read_bytes()
538 await _validate_yaml_content(content)
540 await bucket_obj.write(storage_path, content)
542 result["synced"].append(f"PUSH: {storage_path}")
543 debug(f"Pushed settings to storage: {storage_path}")
545 except Exception as e:
546 result["errors"].append(e)
547 debug(f"Error pushing settings {storage_path}: {e}")
550async def _handle_settings_conflict(
551 local_path: AsyncPath,
552 storage: t.Any,
553 bucket: str,
554 storage_path: str,
555 local_info: dict[str, t.Any],
556 remote_info: dict[str, t.Any],
557 strategy: SyncStrategy,
558 result: dict[str, t.Any],
559) -> None:
560 try:
561 if strategy.conflict_strategy == ConflictStrategy.MANUAL:
562 result["conflicts"].append(
563 {
564 "path": storage_path,
565 "local_mtime": local_info["mtime"],
566 "remote_mtime": remote_info["mtime"],
567 "reason": "manual_resolution_required",
568 },
569 )
570 return
572 try:
573 await _validate_yaml_content(local_info["content"])
574 await _validate_yaml_content(remote_info["content"])
575 except Exception as e:
576 result["errors"].append(f"Invalid YAML during conflict resolution: {e}")
577 return
579 resolved_content, resolution_reason = await resolve_conflict(
580 Path(local_path),
581 remote_info["content"],
582 local_info["content"],
583 strategy.conflict_strategy,
584 local_info["mtime"],
585 remote_info["mtime"],
586 )
588 if strategy.dry_run:
589 debug(
590 f"DRY RUN: Would resolve conflict for {storage_path}: {resolution_reason}",
591 )
592 result["synced"].append(
593 f"CONFLICT(dry-run): {storage_path} - {resolution_reason}",
594 )
595 return
597 if (
598 strategy.backup_on_conflict
599 or strategy.conflict_strategy == ConflictStrategy.BACKUP_BOTH
600 ):
601 backup_path = await create_backup(Path(local_path), "conflict")
602 result["backed_up"].append(str(backup_path))
604 if resolved_content == remote_info["content"]:
605 await local_path.write_bytes(resolved_content)
606 result["synced"].append(
607 f"CONFLICT->REMOTE: {storage_path} - {resolution_reason}",
608 )
609 else:
610 bucket_obj = getattr(storage, bucket)
611 await bucket_obj.write(storage_path, resolved_content)
612 result["synced"].append(
613 f"CONFLICT->LOCAL: {storage_path} - {resolution_reason}",
614 )
616 debug(f"Resolved settings conflict: {storage_path} - {resolution_reason}")
618 except Exception as e:
619 result["errors"].append(e)
620 result["conflicts"].append(
621 {
622 "path": storage_path,
623 "error": str(e),
624 "reason": "resolution_failed",
625 },
626 )
629async def _reload_configuration(adapter_names: list[str]) -> None:
630 try:
631 from acb.config import reload_config # type: ignore[attr-defined]
632 from acb.depends import depends
634 config = await reload_config()
635 depends.set("config", config)
636 debug(f"Reloaded configuration for adapters: {adapter_names}")
637 except Exception as e:
638 debug(f"Error reloading configuration: {e}")
639 raise
642async def backup_settings(
643 settings_path: AsyncPath | None = None,
644 backup_suffix: str | None = None,
645) -> dict[str, t.Any]:
646 settings_path = settings_path or AsyncPath("settings")
647 backup_suffix = backup_suffix or _generate_backup_suffix()
649 result = _create_backup_result()
651 try:
652 if not await settings_path.exists():
653 result["errors"].append(f"Settings path does not exist: {settings_path}")
654 return result
656 await _backup_files_with_patterns(settings_path, backup_suffix, result)
658 except Exception as e:
659 result["errors"].append(str(e))
660 debug(f"Error in backup_settings: {e}")
662 return result
665def _generate_backup_suffix() -> str:
666 import time
668 timestamp = int(time.time())
669 return f"backup_{timestamp}"
672def _create_backup_result() -> dict[str, t.Any]:
673 return {
674 "backed_up": [],
675 "errors": [],
676 "skipped": [],
677 }
680async def _backup_files_with_patterns(
681 settings_path: AsyncPath,
682 backup_suffix: str,
683 result: dict[str, t.Any],
684) -> None:
685 patterns = ["*.yml", "*.yaml"]
687 for pattern in patterns:
688 await _backup_files_with_pattern(settings_path, pattern, backup_suffix, result)
691async def _backup_files_with_pattern(
692 settings_path: AsyncPath,
693 pattern: str,
694 backup_suffix: str,
695 result: dict[str, t.Any],
696) -> None:
697 async for file_path in settings_path.rglob(pattern):
698 if await file_path.is_file():
699 await _backup_single_file(file_path, backup_suffix, result)
702async def _backup_single_file(
703 file_path: AsyncPath,
704 backup_suffix: str,
705 result: dict[str, t.Any],
706) -> None:
707 try:
708 backup_path = await create_backup(Path(file_path), backup_suffix)
709 result["backed_up"].append(str(backup_path))
710 except Exception as e:
711 result["errors"].append(f"{file_path}: {e}")
714async def get_settings_sync_status(
715 settings_path: AsyncPath | None = None,
716 storage_bucket: str = "settings",
717) -> dict[str, t.Any]:
718 if settings_path is None:
719 settings_path = AsyncPath("settings")
721 status: dict[str, t.Any] = {
722 "total_settings": 0,
723 "in_sync": 0,
724 "out_of_sync": 0,
725 "local_only": 0,
726 "remote_only": 0,
727 "conflicts": 0,
728 "details": [],
729 }
731 try:
732 storage = await _get_storage_adapter()
733 if not storage:
734 status["error"] = "Storage adapter not available"
735 return status
737 settings_files = await _discover_settings_files(settings_path)
738 status["total_settings"] = len(settings_files)
740 await _process_settings_files(settings_files, storage, storage_bucket, status)
742 status["out_of_sync"] = (
743 status["conflicts"] + status["local_only"] + status["remote_only"]
744 )
746 except Exception as e:
747 status["error"] = str(e)
748 debug(f"Error getting settings sync status: {e}")
750 return status
753async def _get_storage_adapter() -> t.Any:
754 """Get the storage adapter."""
755 from acb.depends import depends
757 return depends.get("storage")
760async def _process_settings_files(
761 settings_files: list[dict[str, t.Any]],
762 storage: t.Any,
763 storage_bucket: str,
764 status: dict[str, t.Any],
765) -> None:
766 """Process all settings files and update status."""
767 for settings_info in settings_files:
768 local_info = await get_file_info(Path(settings_info["local_path"]))
769 remote_info = await _get_storage_file_info(
770 storage,
771 storage_bucket,
772 settings_info["storage_path"],
773 )
775 file_status = _create_file_status(settings_info, local_info, remote_info)
776 _update_status_counters(local_info, remote_info, file_status, status)
777 status["details"].append(file_status)
780def _create_file_status(
781 settings_info: dict[str, t.Any],
782 local_info: dict[str, t.Any],
783 remote_info: dict[str, t.Any],
784) -> dict[str, t.Any]:
785 """Create file status dictionary."""
786 file_status: dict[str, t.Any] = {
787 "path": settings_info["storage_path"],
788 "adapter": settings_info["adapter_name"],
789 "local_exists": local_info["exists"],
790 "remote_exists": remote_info["exists"],
791 }
793 # Determine sync status
794 if local_info["exists"] and remote_info["exists"]:
795 if local_info["content_hash"] == remote_info["content_hash"]:
796 file_status["status"] = "in_sync"
797 else:
798 file_status["status"] = "conflict"
799 file_status["local_mtime"] = local_info["mtime"]
800 file_status["remote_mtime"] = remote_info["mtime"]
801 elif local_info["exists"]:
802 file_status["status"] = "local_only"
803 elif remote_info["exists"]:
804 file_status["status"] = "remote_only"
805 else:
806 file_status["status"] = "missing"
808 return file_status
811def _update_status_counters(
812 local_info: dict[str, t.Any],
813 remote_info: dict[str, t.Any],
814 file_status: dict[str, t.Any],
815 status: dict[str, t.Any],
816) -> None:
817 """Update status counters based on file status."""
818 if local_info["exists"] and remote_info["exists"]:
819 if local_info["content_hash"] == remote_info["content_hash"]:
820 status["in_sync"] += 1
821 else:
822 status["conflicts"] += 1
823 elif local_info["exists"]:
824 status["local_only"] += 1
825 elif remote_info["exists"]:
826 status["remote_only"] += 1
829async def validate_all_settings(
830 settings_path: AsyncPath | None = None,
831) -> dict[str, t.Any]:
832 if settings_path is None:
833 settings_path = AsyncPath("settings")
835 result: dict[str, t.Any] = {
836 "valid": [],
837 "invalid": [],
838 "missing": [],
839 "total_checked": 0,
840 }
842 try:
843 settings_files = await _discover_settings_files(settings_path)
844 result["total_checked"] = len(settings_files)
846 for settings_info in settings_files:
847 file_path = settings_info["local_path"]
849 if not await file_path.exists():
850 result["missing"].append(str(file_path))
851 continue
853 try:
854 content = await file_path.read_bytes()
855 await _validate_yaml_content(content)
856 result["valid"].append(str(file_path))
857 except Exception as e:
858 result["invalid"].append(
859 {
860 "path": str(file_path),
861 "error": str(e),
862 },
863 )
865 except Exception as e:
866 result["error"] = str(e)
867 debug(f"Error validating settings: {e}")
869 return result