Coverage for fastblocks / mcp / config_migration.py: 0%

276 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:30 -0800

1"""Configuration migration tools for FastBlocks.""" 

2 

3import json 

4from collections.abc import Callable 

5from dataclasses import dataclass, field 

6from datetime import datetime 

7from enum import Enum 

8from pathlib import Path 

9from typing import Any, cast 

10 

11import yaml 

12 

13# Module-level constants for migration defaults 

14_DEFAULT_ADAPTER_METADATA = { 

15 "module_id_generator": lambda: __import__("uuid").uuid4(), 

16 "module_status": "stable", 

17} 

18 

19 

20class MigrationDirection(str, Enum): 

21 """Migration direction.""" 

22 

23 UPGRADE = "upgrade" 

24 DOWNGRADE = "downgrade" 

25 

26 

27@dataclass 

28class MigrationStep: 

29 """A single migration step.""" 

30 

31 name: str 

32 description: str 

33 function: Callable[..., Any] 

34 version_from: str 

35 version_to: str 

36 direction: MigrationDirection 

37 reversible: bool = True 

38 

39 

40@dataclass 

41class MigrationResult: 

42 """Result of a migration operation.""" 

43 

44 success: bool 

45 version_from: str 

46 version_to: str 

47 steps_applied: list[str] = field(default_factory=list) 

48 warnings: list[str] = field(default_factory=list) 

49 errors: list[str] = field(default_factory=list) 

50 execution_time_ms: float = 0.0 

51 timestamp: datetime = field(default_factory=datetime.now) 

52 

53 

54class ConfigurationMigrationManager: 

55 """Manages configuration migrations between versions.""" 

56 

57 def __init__(self, base_path: Path | None = None): 

58 """Initialize migration manager.""" 

59 self.base_path = base_path or Path.cwd() 

60 self.migrations_dir = self.base_path / ".fastblocks" / "migrations" 

61 self.migrations_dir.mkdir(parents=True, exist_ok=True) 

62 

63 # Version history 

64 self.version_history = [ 

65 "0.1.0", # Initial version 

66 "0.2.0", # Added adapter metadata 

67 "0.3.0", # Added environment variable validation 

68 "1.0.0", # Production-ready configuration schema 

69 ] 

70 

71 # Register migration steps 

72 self.migration_steps = self._register_migration_steps() 

73 

74 def _create_metadata_migration_step(self) -> MigrationStep: 

75 """Create migration step for adding adapter metadata.""" 

76 return MigrationStep( 

77 name="add_adapter_metadata", 

78 description="Add MODULE_ID and MODULE_STATUS metadata to adapters", 

79 function=self._migrate_add_adapter_metadata, 

80 version_from="0.1.0", 

81 version_to="0.2.0", 

82 direction=MigrationDirection.UPGRADE, 

83 ) 

84 

85 def _create_env_validation_step(self) -> MigrationStep: 

86 """Create migration step for environment variable validation.""" 

87 return MigrationStep( 

88 name="add_env_validation", 

89 description="Add validation patterns to environment variables", 

90 function=self._migrate_add_env_validation, 

91 version_from="0.2.0", 

92 version_to="0.3.0", 

93 direction=MigrationDirection.UPGRADE, 

94 ) 

95 

96 def _create_production_upgrade_step(self) -> MigrationStep: 

97 """Create migration step for production-ready upgrade.""" 

98 return MigrationStep( 

99 name="production_ready_schema", 

100 description="Upgrade to production-ready configuration schema", 

101 function=self._migrate_production_ready, 

102 version_from="0.3.0", 

103 version_to="1.0.0", 

104 direction=MigrationDirection.UPGRADE, 

105 ) 

106 

107 def _create_production_downgrade_step(self) -> MigrationStep: 

108 """Create migration step for production downgrade.""" 

109 return MigrationStep( 

110 name="remove_production_features", 

111 description="Downgrade from production-ready schema", 

112 function=self._migrate_remove_production_features, 

113 version_from="1.0.0", 

114 version_to="0.3.0", 

115 direction=MigrationDirection.DOWNGRADE, 

116 ) 

117 

118 def _register_migration_steps(self) -> list[MigrationStep]: 

119 """Register all migration steps.""" 

120 return [ 

121 self._create_metadata_migration_step(), 

122 self._create_env_validation_step(), 

123 self._create_production_upgrade_step(), 

124 self._create_production_downgrade_step(), 

125 ] 

126 

127 def _create_already_at_version_result( 

128 self, current_version: str, target_version: str 

129 ) -> MigrationResult: 

130 """Create result when already at target version.""" 

131 return MigrationResult( 

132 success=True, 

133 version_from=current_version, 

134 version_to=target_version, 

135 warnings=["Configuration is already at target version"], 

136 ) 

137 

138 def _create_no_path_result( 

139 self, current_version: str, target_version: str 

140 ) -> MigrationResult: 

141 """Create result when no migration path exists.""" 

142 return MigrationResult( 

143 success=False, 

144 version_from=current_version, 

145 version_to=target_version, 

146 errors=[ 

147 f"No migration path found from {current_version} to {target_version}" 

148 ], 

149 ) 

150 

151 async def _execute_migration_steps( 

152 self, 

153 config_data: dict[str, Any], 

154 migration_path: list[MigrationStep], 

155 result: MigrationResult, 

156 ) -> dict[str, Any]: 

157 """Execute migration steps and update result.""" 

158 current_data = config_data.copy() 

159 

160 for step in migration_path: 

161 try: 

162 current_data = await step.function(current_data) 

163 result.steps_applied.append(step.name) 

164 current_data["version"] = step.version_to 

165 except Exception as e: 

166 result.success = False 

167 result.errors.append(f"Migration step '{step.name}' failed: {e}") 

168 break 

169 

170 return current_data 

171 

172 async def migrate_configuration( 

173 self, config_data: dict[str, Any], target_version: str 

174 ) -> MigrationResult: 

175 """Migrate configuration to target version.""" 

176 current_version = config_data.get("version", "0.1.0") 

177 

178 if current_version == target_version: 

179 return self._create_already_at_version_result( 

180 current_version, target_version 

181 ) 

182 

183 migration_path = self._get_migration_path(current_version, target_version) 

184 if not migration_path: 

185 return self._create_no_path_result(current_version, target_version) 

186 

187 result = MigrationResult( 

188 success=True, version_from=current_version, version_to=target_version 

189 ) 

190 

191 await self._execute_migration_steps(config_data, migration_path, result) 

192 

193 return result 

194 

195 def _determine_migration_direction( 

196 self, from_idx: int, to_idx: int 

197 ) -> tuple[MigrationDirection, list[str]]: 

198 """Determine migration direction and version range.""" 

199 if from_idx < to_idx: 

200 direction = MigrationDirection.UPGRADE 

201 version_range = self.version_history[from_idx:to_idx] 

202 else: 

203 direction = MigrationDirection.DOWNGRADE 

204 version_range = list(reversed(self.version_history[to_idx:from_idx])) 

205 return direction, version_range 

206 

207 def _build_migration_path( 

208 self, version_range: list[str], direction: MigrationDirection 

209 ) -> list[MigrationStep]: 

210 """Build migration path from version range.""" 

211 migration_path = [] 

212 for i in range(len(version_range) - 1): 

213 current_version = version_range[i] 

214 next_version = version_range[i + 1] 

215 

216 step = self._find_migration_step(current_version, next_version, direction) 

217 if step: 

218 migration_path.append(step) 

219 else: 

220 return [] # No migration path available if any step is missing 

221 

222 return migration_path 

223 

224 def _get_migration_path( 

225 self, from_version: str, to_version: str 

226 ) -> list[MigrationStep]: 

227 """Get migration path between versions.""" 

228 if ( 

229 from_version not in self.version_history 

230 or to_version not in self.version_history 

231 ): 

232 return [] 

233 

234 from_idx = self.version_history.index(from_version) 

235 to_idx = self.version_history.index(to_version) 

236 

237 if from_idx == to_idx: 

238 return [] 

239 

240 direction, version_range = self._determine_migration_direction(from_idx, to_idx) 

241 return self._build_migration_path(version_range, direction) 

242 

243 def _find_migration_step( 

244 self, from_version: str, to_version: str, direction: MigrationDirection 

245 ) -> MigrationStep | None: 

246 """Find migration step for specific version transition.""" 

247 for step in self.migration_steps: 

248 if ( 

249 step.version_from == from_version 

250 and step.version_to == to_version 

251 and step.direction == direction 

252 ): 

253 return step 

254 return None 

255 

256 # Migration functions 

257 def _ensure_adapter_metadata(self, adapter_config: dict[str, Any]) -> None: 

258 """Ensure adapter has required metadata fields.""" 

259 if "metadata" not in adapter_config: 

260 adapter_config["metadata"] = {} 

261 

262 metadata = adapter_config["metadata"] 

263 if "module_id" not in metadata: 

264 # Cast to Callable since dict lookup returns Any 

265 generator = cast( 

266 Callable[[], Any], _DEFAULT_ADAPTER_METADATA["module_id_generator"] 

267 ) 

268 metadata["module_id"] = str(generator()) 

269 

270 if "module_status" not in metadata: 

271 metadata["module_status"] = _DEFAULT_ADAPTER_METADATA["module_status"] 

272 

273 async def _migrate_add_adapter_metadata( 

274 self, config_data: dict[str, Any] 

275 ) -> dict[str, Any]: 

276 """Add adapter metadata to configuration.""" 

277 if "adapters" not in config_data: 

278 return config_data 

279 

280 for adapter_config in config_data["adapters"].values(): 

281 if isinstance(adapter_config, dict): 

282 self._ensure_adapter_metadata(adapter_config) 

283 

284 return config_data 

285 

286 async def _migrate_add_env_validation( 

287 self, config_data: dict[str, Any] 

288 ) -> dict[str, Any]: 

289 """Add environment variable validation patterns.""" 

290 # Add validation patterns to global environment variables 

291 if "global_environment" in config_data: 

292 self._add_validation_to_env_vars(config_data["global_environment"]) 

293 

294 # Add validation patterns to adapter environment variables 

295 if "adapters" in config_data: 

296 self._add_validation_to_adapter_env_vars(config_data["adapters"]) 

297 

298 return config_data 

299 

300 def _add_validation_to_env_vars(self, env_vars: list[dict[str, Any]]) -> None: 

301 """Add validation patterns to environment variables.""" 

302 for env_var in env_vars: 

303 if isinstance(env_var, dict) and "validator_pattern" not in env_var: 

304 env_var["validator_pattern"] = self._suggest_validation_pattern( 

305 env_var.get("name", "") 

306 ) 

307 

308 def _add_validation_to_adapter_env_vars(self, adapters: dict[str, Any]) -> None: 

309 """Add validation patterns to adapter environment variables.""" 

310 for adapter_config in adapters.values(): 

311 if not isinstance(adapter_config, dict): 

312 continue 

313 if "environment_variables" not in adapter_config: 

314 continue 

315 

316 self._add_validation_to_env_vars(adapter_config["environment_variables"]) 

317 

318 async def _migrate_production_ready( 

319 self, config_data: dict[str, Any] 

320 ) -> dict[str, Any]: 

321 """Upgrade to production-ready configuration schema.""" 

322 # Initialize and configure global settings 

323 global_settings = config_data.setdefault("global_settings", {}) 

324 self._add_production_global_settings(global_settings) 

325 

326 # Upgrade adapter configurations 

327 if "adapters" in config_data: 

328 self._upgrade_adapter_configs(config_data["adapters"]) 

329 

330 return config_data 

331 

332 def _get_security_settings(self) -> dict[str, bool]: 

333 """Get production security settings.""" 

334 return { 

335 "force_https": True, 

336 "secure_cookies": True, 

337 "csrf_protection": True, 

338 "content_security_policy": True, 

339 } 

340 

341 def _get_monitoring_settings(self) -> dict[str, bool]: 

342 """Get production monitoring settings.""" 

343 return { 

344 "health_checks": True, 

345 "metrics_collection": True, 

346 "error_reporting": True, 

347 } 

348 

349 def _get_performance_settings(self) -> dict[str, bool]: 

350 """Get production performance settings.""" 

351 return { 

352 "caching_enabled": True, 

353 "compression_enabled": True, 

354 "static_file_optimization": True, 

355 } 

356 

357 def _add_production_global_settings(self, global_settings: dict[str, Any]) -> None: 

358 """Add production-specific global settings.""" 

359 production_settings = { 

360 "security": self._get_security_settings(), 

361 "monitoring": self._get_monitoring_settings(), 

362 "performance": self._get_performance_settings(), 

363 } 

364 

365 for key, value in production_settings.items(): 

366 global_settings.setdefault(key, value) 

367 

368 def _upgrade_adapter_configs(self, adapters: dict[str, Any]) -> None: 

369 """Upgrade adapter configurations for production.""" 

370 for adapter_config in adapters.values(): 

371 if isinstance(adapter_config, dict): 

372 self._add_adapter_production_features(adapter_config) 

373 

374 def _get_health_check_config(self) -> dict[str, Any]: 

375 """Get default health check configuration.""" 

376 return { 

377 "enabled": True, 

378 "interval_seconds": 60, 

379 "timeout_seconds": 30, 

380 } 

381 

382 def _get_profile_overrides(self) -> dict[str, dict[str, Any]]: 

383 """Get default profile overrides.""" 

384 return { 

385 "production": {"debug": False, "log_level": "WARNING"}, 

386 "development": {"debug": True, "log_level": "DEBUG"}, 

387 } 

388 

389 def _add_adapter_production_features(self, adapter_config: dict[str, Any]) -> None: 

390 """Add production features to adapter configuration.""" 

391 adapter_config.setdefault( 

392 "health_check_config", self._get_health_check_config() 

393 ) 

394 adapter_config.setdefault("profile_overrides", self._get_profile_overrides()) 

395 

396 async def _migrate_remove_production_features( 

397 self, config_data: dict[str, Any] 

398 ) -> dict[str, Any]: 

399 """Remove production-specific features for downgrade.""" 

400 # Remove production-specific global settings 

401 if "global_settings" in config_data: 

402 global_settings = config_data["global_settings"] 

403 for key in ("security", "monitoring", "performance"): 

404 global_settings.pop(key, None) 

405 

406 # Remove adapter production features 

407 if "adapters" in config_data: 

408 for adapter_config in config_data["adapters"].values(): 

409 if isinstance(adapter_config, dict): 

410 adapter_config.pop("health_check_config", None) 

411 adapter_config.pop("profile_overrides", None) 

412 

413 return config_data 

414 

415 def _suggest_validation_pattern(self, var_name: str) -> str | None: 

416 """Suggest validation pattern based on variable name.""" 

417 name_lower = var_name.lower() 

418 

419 if "url" in name_lower: 

420 return r"^https?://[^\s/$.?#].[^\s]*$" 

421 elif "email" in name_lower: 

422 return r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" 

423 elif "port" in name_lower: 

424 return r"^([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$" 

425 elif "debug" in name_lower or "enable" in name_lower: 

426 return r"^(true|false|1|0|yes|no|on|off)$" 

427 elif "log" in name_lower and "level" in name_lower: 

428 return r"^(DEBUG|INFO|WARNING|ERROR|CRITICAL)$" 

429 

430 return None 

431 

432 def _load_config_file( 

433 self, config_file: Path, target_version: str 

434 ) -> tuple[dict[str, Any] | None, MigrationResult | None]: 

435 """Load configuration file and return data or error result.""" 

436 if not config_file.exists(): 

437 return None, MigrationResult( 

438 success=False, 

439 version_from="unknown", 

440 version_to=target_version, 

441 errors=[f"Configuration file not found: {config_file}"], 

442 ) 

443 

444 try: 

445 with config_file.open() as f: 

446 if config_file.suffix.lower() == ".json": 

447 config_data = json.load(f) 

448 else: 

449 config_data = yaml.safe_load(f) 

450 return config_data, None 

451 except Exception as e: 

452 return None, MigrationResult( 

453 success=False, 

454 version_from="unknown", 

455 version_to=target_version, 

456 errors=[f"Failed to load configuration: {e}"], 

457 ) 

458 

459 def _save_config_file( 

460 self, config_data: dict[str, Any], output_path: Path, result: MigrationResult 

461 ) -> None: 

462 """Save migrated configuration to file and update result on error.""" 

463 try: 

464 with output_path.open("w") as f: 

465 if output_path.suffix.lower() == ".json": 

466 json.dump(config_data, f, indent=2) 

467 else: 

468 yaml.dump(config_data, f, default_flow_style=False) 

469 except Exception as e: 

470 result.success = False 

471 result.errors.append(f"Failed to save migrated configuration: {e}") 

472 

473 async def migrate_configuration_file( 

474 self, config_file: Path, target_version: str, output_file: Path | None = None 

475 ) -> MigrationResult: 

476 """Migrate configuration file to target version.""" 

477 # Load configuration 

478 config_data, error_result = self._load_config_file(config_file, target_version) 

479 if error_result: 

480 return error_result 

481 

482 # Perform migration 

483 result = await self.migrate_configuration( 

484 cast(dict[str, Any], config_data), target_version 

485 ) 

486 

487 if result.success: 

488 # Save migrated configuration 

489 output_path = output_file or config_file 

490 self._save_config_file( 

491 cast(dict[str, Any], config_data), output_path, result 

492 ) 

493 

494 return result 

495 

496 def get_current_schema_version(self) -> str: 

497 """Get the current schema version.""" 

498 return self.version_history[-1] 

499 

500 def get_supported_versions(self) -> list[str]: 

501 """Get list of supported configuration versions.""" 

502 return self.version_history.copy() 

503 

504 async def create_migration_backup( 

505 self, config_file: Path, target_version: str 

506 ) -> Path: 

507 """Create backup before migration.""" 

508 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 

509 backup_name = f"{config_file.stem}_backup_{timestamp}{config_file.suffix}" 

510 backup_path = self.migrations_dir / backup_name 

511 

512 # Copy original file 

513 import shutil 

514 

515 shutil.copy2(config_file, backup_path) 

516 

517 # Create migration metadata 

518 metadata = { 

519 "original_file": str(config_file), 

520 "backup_created": datetime.now().isoformat(), 

521 "target_version": target_version, 

522 "original_version": self._detect_configuration_version(config_file), 

523 } 

524 

525 metadata_path = backup_path.with_suffix(".metadata.json") 

526 with metadata_path.open("w") as f: 

527 json.dump(metadata, f, indent=2) 

528 

529 return backup_path 

530 

531 def _detect_configuration_version(self, config_file: Path) -> str: 

532 """Detect version of configuration file.""" 

533 try: 

534 with config_file.open() as f: 

535 if config_file.suffix.lower() == ".json": 

536 config_data = json.load(f) 

537 else: 

538 config_data = yaml.safe_load(f) 

539 

540 version: str = config_data.get("version", "0.1.0") 

541 return version 

542 except Exception: 

543 return "unknown" 

544 

545 def _create_compatibility_result( 

546 self, current_version: str, target_version: str 

547 ) -> dict[str, Any]: 

548 """Create initial compatibility result structure.""" 

549 return { 

550 "compatible": False, 

551 "current_version": current_version, 

552 "target_version": target_version, 

553 "migration_path": [], 

554 "warnings": [], 

555 "requirements": [], 

556 } 

557 

558 def _check_version_unknown( 

559 self, current_version: str, result: dict[str, Any] 

560 ) -> bool: 

561 """Check if version is unknown and update result.""" 

562 if current_version == "unknown": 

563 result["warnings"].append("Cannot detect current configuration version") 

564 return True 

565 return False 

566 

567 def _check_migration_path_exists( 

568 self, migration_path: list[MigrationStep], result: dict[str, Any] 

569 ) -> bool: 

570 """Check if migration path exists and update result.""" 

571 if not migration_path: 

572 current_v = result["current_version"] 

573 target_v = result["target_version"] 

574 result["warnings"].append( 

575 f"No migration path available from {current_v} to {target_v}" 

576 ) 

577 return False 

578 return True 

579 

580 def _add_downgrade_warnings( 

581 self, migration_path: list[MigrationStep], result: dict[str, Any] 

582 ) -> None: 

583 """Add warnings for downgrade steps in migration path.""" 

584 for step in migration_path: 

585 if step.direction == MigrationDirection.DOWNGRADE: 

586 result["warnings"].append( 

587 f"Step '{step.name}' is a downgrade and may result in data loss" 

588 ) 

589 

590 def _add_migration_requirements(self, result: dict[str, Any]) -> None: 

591 """Add migration requirements to result.""" 

592 result["requirements"] = [ 

593 "Backup will be created automatically", 

594 "Configuration file will be updated in place", 

595 "Verify adapter compatibility after migration", 

596 ] 

597 

598 async def validate_migration_compatibility( 

599 self, config_file: Path, target_version: str 

600 ) -> dict[str, Any]: 

601 """Validate if migration is possible and safe.""" 

602 current_version = self._detect_configuration_version(config_file) 

603 

604 result = self._create_compatibility_result(current_version, target_version) 

605 

606 if self._check_version_unknown(current_version, result): 

607 return result 

608 

609 # Get migration path 

610 migration_path = self._get_migration_path(current_version, target_version) 

611 if not self._check_migration_path_exists(migration_path, result): 

612 return result 

613 

614 result["compatible"] = True 

615 result["migration_path"] = [step.name for step in migration_path] 

616 

617 # Check for potential issues 

618 self._add_downgrade_warnings(migration_path, result) 

619 

620 # Add requirements 

621 self._add_migration_requirements(result) 

622 

623 return result