Coverage for fastblocks / _workflows_integration.py: 21%

163 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:30 -0800

1"""ACB Workflows integration for FastBlocks. 

2 

3This module provides background job orchestration using ACB's Workflows system, 

4with graceful degradation when ACB workflows are not available. 

5 

6Author: lesleslie <les@wedgwoodwebworks.com> 

7Created: 2025-10-01 

8 

9Key Features: 

10- Cache warming workflows (template and static file caching) 

11- Template cleanup workflows (remove stale templates, optimize storage) 

12- Performance optimization workflows (database query optimization, index maintenance) 

13- Scheduled background tasks 

14- Graceful degradation when Workflows unavailable 

15 

16Usage: 

17 # Execute cache warming workflow 

18 from fastblocks._workflows_integration import execute_cache_warming 

19 result = await execute_cache_warming() 

20 

21 # Execute template cleanup workflow 

22 from fastblocks._workflows_integration import execute_template_cleanup 

23 result = await execute_template_cleanup() 

24 

25 # Execute performance optimization workflow 

26 from fastblocks._workflows_integration import execute_performance_optimization 

27 result = await execute_performance_optimization() 

28""" 

29 

30import typing as t 

31from contextlib import suppress 

32from datetime import datetime 

33 

34from acb.depends import depends 

35 

36# Try to import ACB workflows 

37acb_workflows_available = False 

38BasicWorkflowEngine = None 

39WorkflowDefinition = None 

40WorkflowStep = None 

41 

42with suppress(ImportError): 

43 from acb.workflows import ( # type: ignore[no-redef] 

44 BasicWorkflowEngine, 

45 WorkflowDefinition, 

46 WorkflowStep, 

47 ) 

48 

49 acb_workflows_available = True 

50 

51 

52class FastBlocksWorkflowService: 

53 """FastBlocks wrapper for ACB Workflows with graceful degradation.""" 

54 

55 _instance: t.ClassVar["FastBlocksWorkflowService | None"] = None 

56 

57 def __new__(cls) -> "FastBlocksWorkflowService": 

58 """Singleton pattern - ensure only one instance exists.""" 

59 if cls._instance is None: 

60 cls._instance = super().__new__(cls) 

61 return cls._instance 

62 

63 def __init__(self) -> None: 

64 """Initialize workflow service with ACB integration.""" 

65 if not hasattr(self, "_initialized"): 

66 self._engine: t.Any = None # BasicWorkflowEngine when ACB available 

67 self._initialized = True 

68 

69 # Try to get ACB workflow engine 

70 if acb_workflows_available and BasicWorkflowEngine: 

71 with suppress(Exception): 

72 self._engine = BasicWorkflowEngine( 

73 max_concurrent_steps=3, # Conservative concurrency 

74 enable_retry=True, 

75 max_retries=2, 

76 ) 

77 

78 @property 

79 def available(self) -> bool: 

80 """Check if ACB Workflows is available.""" 

81 return acb_workflows_available and self._engine is not None 

82 

83 

84# Singleton instance 

85_workflow_service: FastBlocksWorkflowService | None = None 

86 

87 

88def get_workflow_service() -> FastBlocksWorkflowService: 

89 """Get the singleton FastBlocksWorkflowService instance.""" 

90 global _workflow_service 

91 if _workflow_service is None: 

92 _workflow_service = FastBlocksWorkflowService() 

93 return _workflow_service 

94 

95 

96async def execute_cache_warming( 

97 warm_templates: bool = True, 

98 warm_static: bool = True, 

99 warm_routes: bool = True, 

100) -> dict[str, t.Any]: 

101 """Execute cache warming workflow. 

102 

103 Pre-loads frequently accessed resources into cache to improve performance. 

104 

105 Args: 

106 warm_templates: Pre-cache commonly used templates 

107 warm_static: Pre-cache static file metadata 

108 warm_routes: Pre-cache route definitions 

109 

110 Returns: 

111 Dictionary with workflow results 

112 """ 

113 service = get_workflow_service() 

114 

115 if not service.available: 

116 # Graceful degradation - manual cache warming 

117 return await _manual_cache_warming(warm_templates, warm_static, warm_routes) 

118 

119 # Define workflow steps 

120 steps = [] 

121 

122 if warm_templates: 

123 steps.append( 

124 WorkflowStep( # type: ignore[operator] 

125 step_id="warm_templates", 

126 name="Warm Template Cache", 

127 action="warm_template_cache", 

128 params={}, 

129 retry_on_failure=True, 

130 max_retries=2, 

131 ) 

132 ) 

133 

134 if warm_static: 

135 steps.append( 

136 WorkflowStep( # type: ignore[operator] 

137 step_id="warm_static", 

138 name="Warm Static File Cache", 

139 action="warm_static_cache", 

140 params={}, 

141 retry_on_failure=True, 

142 max_retries=2, 

143 ) 

144 ) 

145 

146 if warm_routes: 

147 steps.append( 

148 WorkflowStep( # type: ignore[operator] 

149 step_id="warm_routes", 

150 name="Warm Route Cache", 

151 action="warm_route_cache", 

152 params={}, 

153 retry_on_failure=True, 

154 max_retries=2, 

155 ) 

156 ) 

157 

158 # Create workflow definition 

159 workflow = WorkflowDefinition( # type: ignore[operator] 

160 workflow_id="cache-warming", 

161 name="Cache Warming Workflow", 

162 description="Pre-load frequently accessed resources into cache", 

163 steps=steps, 

164 max_execution_time=300, # 5 minutes max 

165 ) 

166 

167 # Execute workflow 

168 result = await service._engine.execute( 

169 workflow, 

170 context={ 

171 "warm_templates": warm_templates, 

172 "warm_static": warm_static, 

173 "warm_routes": warm_routes, 

174 }, 

175 action_handlers={ 

176 "warm_template_cache": _warm_template_cache, 

177 "warm_static_cache": _warm_static_cache, 

178 "warm_route_cache": _warm_route_cache, 

179 }, 

180 ) 

181 

182 return { 

183 "workflow_id": workflow.workflow_id, 

184 "state": result.state.value 

185 if hasattr(result.state, "value") 

186 else str(result.state), 

187 "completed_at": datetime.now().isoformat(), 

188 "steps_completed": len( 

189 [s for s in result.step_results.values() if s.state == "completed"] 

190 ), 

191 "steps_failed": len( 

192 [s for s in result.step_results.values() if s.state == "failed"] 

193 ), 

194 "errors": [s.error for s in result.step_results.values() if s.error], 

195 } 

196 

197 

198async def execute_template_cleanup( 

199 remove_stale: bool = True, 

200 optimize_storage: bool = True, 

201 cleanup_cache: bool = True, 

202) -> dict[str, t.Any]: 

203 """Execute template cleanup workflow. 

204 

205 Removes stale templates, optimizes storage, and cleans up cache. 

206 

207 Args: 

208 remove_stale: Remove templates not accessed in 30+ days 

209 optimize_storage: Compress and optimize template storage 

210 cleanup_cache: Clear unused template cache entries 

211 

212 Returns: 

213 Dictionary with workflow results 

214 """ 

215 service = get_workflow_service() 

216 

217 if not service.available: 

218 # Graceful degradation - manual cleanup 

219 return await _manual_template_cleanup( 

220 remove_stale, optimize_storage, cleanup_cache 

221 ) 

222 

223 # Define workflow steps with dependencies 

224 steps = [] 

225 

226 if cleanup_cache: 

227 steps.append( 

228 WorkflowStep( # type: ignore[operator] 

229 step_id="cleanup_cache", 

230 name="Cleanup Template Cache", 

231 action="cleanup_template_cache", 

232 params={}, 

233 retry_on_failure=False, 

234 ) 

235 ) 

236 

237 if remove_stale: 

238 steps.append( 

239 WorkflowStep( # type: ignore[operator] 

240 step_id="remove_stale", 

241 name="Remove Stale Templates", 

242 action="remove_stale_templates", 

243 params={"days_threshold": 30}, 

244 depends_on=["cleanup_cache"] if cleanup_cache else [], 

245 retry_on_failure=False, 

246 ) 

247 ) 

248 

249 if optimize_storage: 

250 steps.append( 

251 WorkflowStep( # type: ignore[operator] 

252 step_id="optimize_storage", 

253 name="Optimize Template Storage", 

254 action="optimize_template_storage", 

255 params={}, 

256 depends_on=["remove_stale"] if remove_stale else [], 

257 retry_on_failure=True, 

258 max_retries=2, 

259 ) 

260 ) 

261 

262 # Create workflow definition 

263 workflow = WorkflowDefinition( # type: ignore[operator] 

264 workflow_id="template-cleanup", 

265 name="Template Cleanup Workflow", 

266 description="Remove stale templates and optimize storage", 

267 steps=steps, 

268 max_execution_time=600, # 10 minutes max 

269 ) 

270 

271 # Execute workflow 

272 result = await service._engine.execute( 

273 workflow, 

274 context={ 

275 "remove_stale": remove_stale, 

276 "optimize_storage": optimize_storage, 

277 "cleanup_cache": cleanup_cache, 

278 }, 

279 action_handlers={ 

280 "cleanup_template_cache": _cleanup_template_cache, 

281 "remove_stale_templates": _remove_stale_templates, 

282 "optimize_template_storage": _optimize_template_storage, 

283 }, 

284 ) 

285 

286 return { 

287 "workflow_id": workflow.workflow_id, 

288 "state": result.state.value 

289 if hasattr(result.state, "value") 

290 else str(result.state), 

291 "completed_at": datetime.now().isoformat(), 

292 "steps_completed": len( 

293 [s for s in result.step_results.values() if s.state == "completed"] 

294 ), 

295 "steps_failed": len( 

296 [s for s in result.step_results.values() if s.state == "failed"] 

297 ), 

298 "errors": [s.error for s in result.step_results.values() if s.error], 

299 } 

300 

301 

302async def execute_performance_optimization( 

303 optimize_queries: bool = True, 

304 rebuild_indexes: bool = True, 

305 cleanup_sessions: bool = True, 

306) -> dict[str, t.Any]: 

307 """Execute performance optimization workflow. 

308 

309 Optimizes database queries, rebuilds indexes, and cleans up sessions. 

310 

311 Args: 

312 optimize_queries: Analyze and optimize slow queries 

313 rebuild_indexes: Rebuild database indexes for optimal performance 

314 cleanup_sessions: Clean up expired sessions 

315 

316 Returns: 

317 Dictionary with workflow results 

318 """ 

319 service = get_workflow_service() 

320 

321 if not service.available: 

322 # Graceful degradation - manual optimization 

323 return await _manual_performance_optimization( 

324 optimize_queries, rebuild_indexes, cleanup_sessions 

325 ) 

326 

327 # Define workflow steps 

328 steps = [] 

329 

330 if cleanup_sessions: 

331 steps.append( 

332 WorkflowStep( # type: ignore[operator] 

333 step_id="cleanup_sessions", 

334 name="Cleanup Expired Sessions", 

335 action="cleanup_expired_sessions", 

336 params={"expiry_hours": 24}, 

337 retry_on_failure=False, 

338 ) 

339 ) 

340 

341 if optimize_queries: 

342 steps.append( 

343 WorkflowStep( # type: ignore[operator] 

344 step_id="optimize_queries", 

345 name="Optimize Database Queries", 

346 action="optimize_database_queries", 

347 params={}, 

348 retry_on_failure=True, 

349 max_retries=2, 

350 ) 

351 ) 

352 

353 if rebuild_indexes: 

354 steps.append( 

355 WorkflowStep( # type: ignore[operator] 

356 step_id="rebuild_indexes", 

357 name="Rebuild Database Indexes", 

358 action="rebuild_database_indexes", 

359 params={}, 

360 depends_on=["optimize_queries"] if optimize_queries else [], 

361 retry_on_failure=True, 

362 max_retries=1, 

363 ) 

364 ) 

365 

366 # Create workflow definition 

367 workflow = WorkflowDefinition( # type: ignore[operator] 

368 workflow_id="performance-optimization", 

369 name="Performance Optimization Workflow", 

370 description="Optimize database and application performance", 

371 steps=steps, 

372 max_execution_time=900, # 15 minutes max 

373 ) 

374 

375 # Execute workflow 

376 result = await service._engine.execute( 

377 workflow, 

378 context={ 

379 "optimize_queries": optimize_queries, 

380 "rebuild_indexes": rebuild_indexes, 

381 "cleanup_sessions": cleanup_sessions, 

382 }, 

383 action_handlers={ 

384 "cleanup_expired_sessions": _cleanup_expired_sessions, 

385 "optimize_database_queries": _optimize_database_queries, 

386 "rebuild_database_indexes": _rebuild_database_indexes, 

387 }, 

388 ) 

389 

390 return { 

391 "workflow_id": workflow.workflow_id, 

392 "state": result.state.value 

393 if hasattr(result.state, "value") 

394 else str(result.state), 

395 "completed_at": datetime.now().isoformat(), 

396 "steps_completed": len( 

397 [s for s in result.step_results.values() if s.state == "completed"] 

398 ), 

399 "steps_failed": len( 

400 [s for s in result.step_results.values() if s.state == "failed"] 

401 ), 

402 "errors": [s.error for s in result.step_results.values() if s.error], 

403 } 

404 

405 

406# Action handler implementations 

407 

408 

409async def _warm_template_cache( 

410 context: dict[str, t.Any], params: dict[str, t.Any] 

411) -> dict[str, t.Any]: 

412 """Warm template cache by pre-loading commonly used templates.""" 

413 with suppress(Exception): 

414 from .actions.gather import gather 

415 

416 # Gather all templates 

417 templates_result = await gather.templates() 

418 

419 if templates_result and hasattr(templates_result, "templates"): 

420 cached_count = 0 

421 # Pre-cache template metadata (not full rendering) 

422 cache = await depends.get("cache") 

423 if cache: 

424 for template_name in list(templates_result.templates.keys())[ 

425 :50 

426 ]: # Limit to top 50 

427 cache_key = f"template:metadata:{template_name}" 

428 await cache.set( 

429 cache_key, 

430 { 

431 "name": template_name, 

432 "warmed_at": datetime.now().isoformat(), 

433 }, 

434 ttl=3600, 

435 ) 

436 cached_count += 1 

437 

438 return {"templates_warmed": cached_count, "status": "completed"} 

439 

440 return {"templates_warmed": 0, "status": "skipped"} 

441 

442 

443async def _warm_static_cache( 

444 context: dict[str, t.Any], params: dict[str, t.Any] 

445) -> dict[str, t.Any]: 

446 """Warm static file cache by pre-loading metadata.""" 

447 # Static file warming would depend on static file adapter 

448 return {"static_files_warmed": 0, "status": "skipped"} 

449 

450 

451async def _warm_route_cache( 

452 context: dict[str, t.Any], params: dict[str, t.Any] 

453) -> dict[str, t.Any]: 

454 """Warm route cache by pre-loading route definitions.""" 

455 with suppress(Exception): 

456 from .actions.gather import gather 

457 

458 # Gather all routes 

459 routes_result = await gather.routes() 

460 

461 if routes_result and hasattr(routes_result, "routes"): 

462 cached_count = len(routes_result.routes) 

463 return {"routes_warmed": cached_count, "status": "completed"} 

464 

465 return {"routes_warmed": 0, "status": "skipped"} 

466 

467 

468async def _cleanup_template_cache( 

469 context: dict[str, t.Any], params: dict[str, t.Any] 

470) -> dict[str, t.Any]: 

471 """Clean up unused template cache entries.""" 

472 with suppress(Exception): 

473 cache = await depends.get("cache") 

474 if cache and hasattr(cache, "clear_pattern"): 

475 # Clear stale template cache entries 

476 await cache.clear_pattern("template:*") 

477 return {"cache_cleared": True, "status": "completed"} 

478 

479 return {"cache_cleared": False, "status": "skipped"} 

480 

481 

482async def _remove_stale_templates( 

483 context: dict[str, t.Any], params: dict[str, t.Any] 

484) -> dict[str, t.Any]: 

485 """Remove templates not accessed in X days.""" 

486 days_threshold = params.get("days_threshold", 30) 

487 

488 # In production, would check template access logs 

489 # For now, just return placeholder 

490 return { 

491 "templates_removed": 0, 

492 "days_threshold": days_threshold, 

493 "status": "completed", 

494 } 

495 

496 

497async def _optimize_template_storage( 

498 context: dict[str, t.Any], params: dict[str, t.Any] 

499) -> dict[str, t.Any]: 

500 """Optimize template storage (compress, deduplicate).""" 

501 # In production, would compress/optimize template files 

502 return {"storage_optimized": False, "status": "skipped"} 

503 

504 

505async def _cleanup_expired_sessions( 

506 context: dict[str, t.Any], params: dict[str, t.Any] 

507) -> dict[str, t.Any]: 

508 """Clean up expired sessions.""" 

509 expiry_hours = params.get("expiry_hours", 24) 

510 

511 # In production, would clean up session storage 

512 return { 

513 "sessions_cleaned": 0, 

514 "expiry_hours": expiry_hours, 

515 "status": "completed", 

516 } 

517 

518 

519async def _optimize_database_queries( 

520 context: dict[str, t.Any], params: dict[str, t.Any] 

521) -> dict[str, t.Any]: 

522 """Analyze and optimize slow database queries.""" 

523 # In production, would analyze query logs and optimize 

524 return {"queries_optimized": 0, "status": "skipped"} 

525 

526 

527async def _rebuild_database_indexes( 

528 context: dict[str, t.Any], params: dict[str, t.Any] 

529) -> dict[str, t.Any]: 

530 """Rebuild database indexes for optimal performance.""" 

531 # In production, would rebuild database indexes 

532 return {"indexes_rebuilt": 0, "status": "skipped"} 

533 

534 

535# Manual fallback implementations (when ACB Workflows unavailable) 

536 

537 

538async def _manual_cache_warming( 

539 warm_templates: bool, warm_static: bool, warm_routes: bool 

540) -> dict[str, t.Any]: 

541 """Manual cache warming without workflow orchestration.""" 

542 results = {} 

543 

544 if warm_templates: 

545 result = await _warm_template_cache({}, {}) 

546 results["templates"] = result 

547 

548 if warm_static: 

549 result = await _warm_static_cache({}, {}) 

550 results["static"] = result 

551 

552 if warm_routes: 

553 result = await _warm_route_cache({}, {}) 

554 results["routes"] = result 

555 

556 return { 

557 "workflow_id": "cache-warming", 

558 "state": "completed", 

559 "completed_at": datetime.now().isoformat(), 

560 "results": results, 

561 "mode": "manual", 

562 } 

563 

564 

565async def _manual_template_cleanup( 

566 remove_stale: bool, optimize_storage: bool, cleanup_cache: bool 

567) -> dict[str, t.Any]: 

568 """Manual template cleanup without workflow orchestration.""" 

569 results = {} 

570 

571 if cleanup_cache: 

572 result = await _cleanup_template_cache({}, {}) 

573 results["cache_cleanup"] = result 

574 

575 if remove_stale: 

576 result = await _remove_stale_templates({}, {"days_threshold": 30}) 

577 results["stale_removal"] = result 

578 

579 if optimize_storage: 

580 result = await _optimize_template_storage({}, {}) 

581 results["storage_optimization"] = result 

582 

583 return { 

584 "workflow_id": "template-cleanup", 

585 "state": "completed", 

586 "completed_at": datetime.now().isoformat(), 

587 "results": results, 

588 "mode": "manual", 

589 } 

590 

591 

592async def _manual_performance_optimization( 

593 optimize_queries: bool, rebuild_indexes: bool, cleanup_sessions: bool 

594) -> dict[str, t.Any]: 

595 """Manual performance optimization without workflow orchestration.""" 

596 results = {} 

597 

598 if cleanup_sessions: 

599 result = await _cleanup_expired_sessions({}, {"expiry_hours": 24}) 

600 results["session_cleanup"] = result 

601 

602 if optimize_queries: 

603 result = await _optimize_database_queries({}, {}) 

604 results["query_optimization"] = result 

605 

606 if rebuild_indexes: 

607 result = await _rebuild_database_indexes({}, {}) 

608 results["index_rebuild"] = result 

609 

610 return { 

611 "workflow_id": "performance-optimization", 

612 "state": "completed", 

613 "completed_at": datetime.now().isoformat(), 

614 "results": results, 

615 "mode": "manual", 

616 } 

617 

618 

619async def register_fastblocks_workflows() -> bool: 

620 """Register FastBlocks workflows with ACB. 

621 

622 Returns: 

623 True if registration successful, False otherwise 

624 """ 

625 if not acb_workflows_available: 

626 return False 

627 

628 try: 

629 # Initialize workflow service 

630 workflow_service = get_workflow_service() 

631 

632 # Register with depends 

633 depends.set("fastblocks_workflows", workflow_service) 

634 

635 return workflow_service.available 

636 

637 except Exception: 

638 return False 

639 

640 

641__all__ = [ 

642 "FastBlocksWorkflowService", 

643 "get_workflow_service", 

644 "execute_cache_warming", 

645 "execute_template_cleanup", 

646 "execute_performance_optimization", 

647 "register_fastblocks_workflows", 

648 "acb_workflows_available", 

649]