Coverage for fastblocks / _workflows_integration.py: 21%
163 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:30 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:30 -0800
1"""ACB Workflows integration for FastBlocks.
3This module provides background job orchestration using ACB's Workflows system,
4with graceful degradation when ACB workflows are not available.
6Author: lesleslie <les@wedgwoodwebworks.com>
7Created: 2025-10-01
9Key Features:
10- Cache warming workflows (template and static file caching)
11- Template cleanup workflows (remove stale templates, optimize storage)
12- Performance optimization workflows (database query optimization, index maintenance)
13- Scheduled background tasks
14- Graceful degradation when Workflows unavailable
16Usage:
17 # Execute cache warming workflow
18 from fastblocks._workflows_integration import execute_cache_warming
19 result = await execute_cache_warming()
21 # Execute template cleanup workflow
22 from fastblocks._workflows_integration import execute_template_cleanup
23 result = await execute_template_cleanup()
25 # Execute performance optimization workflow
26 from fastblocks._workflows_integration import execute_performance_optimization
27 result = await execute_performance_optimization()
28"""
30import typing as t
31from contextlib import suppress
32from datetime import datetime
34from acb.depends import depends
36# Try to import ACB workflows
37acb_workflows_available = False
38BasicWorkflowEngine = None
39WorkflowDefinition = None
40WorkflowStep = None
42with suppress(ImportError):
43 from acb.workflows import ( # type: ignore[no-redef]
44 BasicWorkflowEngine,
45 WorkflowDefinition,
46 WorkflowStep,
47 )
49 acb_workflows_available = True
52class FastBlocksWorkflowService:
53 """FastBlocks wrapper for ACB Workflows with graceful degradation."""
55 _instance: t.ClassVar["FastBlocksWorkflowService | None"] = None
57 def __new__(cls) -> "FastBlocksWorkflowService":
58 """Singleton pattern - ensure only one instance exists."""
59 if cls._instance is None:
60 cls._instance = super().__new__(cls)
61 return cls._instance
63 def __init__(self) -> None:
64 """Initialize workflow service with ACB integration."""
65 if not hasattr(self, "_initialized"):
66 self._engine: t.Any = None # BasicWorkflowEngine when ACB available
67 self._initialized = True
69 # Try to get ACB workflow engine
70 if acb_workflows_available and BasicWorkflowEngine:
71 with suppress(Exception):
72 self._engine = BasicWorkflowEngine(
73 max_concurrent_steps=3, # Conservative concurrency
74 enable_retry=True,
75 max_retries=2,
76 )
78 @property
79 def available(self) -> bool:
80 """Check if ACB Workflows is available."""
81 return acb_workflows_available and self._engine is not None
84# Singleton instance
85_workflow_service: FastBlocksWorkflowService | None = None
88def get_workflow_service() -> FastBlocksWorkflowService:
89 """Get the singleton FastBlocksWorkflowService instance."""
90 global _workflow_service
91 if _workflow_service is None:
92 _workflow_service = FastBlocksWorkflowService()
93 return _workflow_service
96async def execute_cache_warming(
97 warm_templates: bool = True,
98 warm_static: bool = True,
99 warm_routes: bool = True,
100) -> dict[str, t.Any]:
101 """Execute cache warming workflow.
103 Pre-loads frequently accessed resources into cache to improve performance.
105 Args:
106 warm_templates: Pre-cache commonly used templates
107 warm_static: Pre-cache static file metadata
108 warm_routes: Pre-cache route definitions
110 Returns:
111 Dictionary with workflow results
112 """
113 service = get_workflow_service()
115 if not service.available:
116 # Graceful degradation - manual cache warming
117 return await _manual_cache_warming(warm_templates, warm_static, warm_routes)
119 # Define workflow steps
120 steps = []
122 if warm_templates:
123 steps.append(
124 WorkflowStep( # type: ignore[operator]
125 step_id="warm_templates",
126 name="Warm Template Cache",
127 action="warm_template_cache",
128 params={},
129 retry_on_failure=True,
130 max_retries=2,
131 )
132 )
134 if warm_static:
135 steps.append(
136 WorkflowStep( # type: ignore[operator]
137 step_id="warm_static",
138 name="Warm Static File Cache",
139 action="warm_static_cache",
140 params={},
141 retry_on_failure=True,
142 max_retries=2,
143 )
144 )
146 if warm_routes:
147 steps.append(
148 WorkflowStep( # type: ignore[operator]
149 step_id="warm_routes",
150 name="Warm Route Cache",
151 action="warm_route_cache",
152 params={},
153 retry_on_failure=True,
154 max_retries=2,
155 )
156 )
158 # Create workflow definition
159 workflow = WorkflowDefinition( # type: ignore[operator]
160 workflow_id="cache-warming",
161 name="Cache Warming Workflow",
162 description="Pre-load frequently accessed resources into cache",
163 steps=steps,
164 max_execution_time=300, # 5 minutes max
165 )
167 # Execute workflow
168 result = await service._engine.execute(
169 workflow,
170 context={
171 "warm_templates": warm_templates,
172 "warm_static": warm_static,
173 "warm_routes": warm_routes,
174 },
175 action_handlers={
176 "warm_template_cache": _warm_template_cache,
177 "warm_static_cache": _warm_static_cache,
178 "warm_route_cache": _warm_route_cache,
179 },
180 )
182 return {
183 "workflow_id": workflow.workflow_id,
184 "state": result.state.value
185 if hasattr(result.state, "value")
186 else str(result.state),
187 "completed_at": datetime.now().isoformat(),
188 "steps_completed": len(
189 [s for s in result.step_results.values() if s.state == "completed"]
190 ),
191 "steps_failed": len(
192 [s for s in result.step_results.values() if s.state == "failed"]
193 ),
194 "errors": [s.error for s in result.step_results.values() if s.error],
195 }
198async def execute_template_cleanup(
199 remove_stale: bool = True,
200 optimize_storage: bool = True,
201 cleanup_cache: bool = True,
202) -> dict[str, t.Any]:
203 """Execute template cleanup workflow.
205 Removes stale templates, optimizes storage, and cleans up cache.
207 Args:
208 remove_stale: Remove templates not accessed in 30+ days
209 optimize_storage: Compress and optimize template storage
210 cleanup_cache: Clear unused template cache entries
212 Returns:
213 Dictionary with workflow results
214 """
215 service = get_workflow_service()
217 if not service.available:
218 # Graceful degradation - manual cleanup
219 return await _manual_template_cleanup(
220 remove_stale, optimize_storage, cleanup_cache
221 )
223 # Define workflow steps with dependencies
224 steps = []
226 if cleanup_cache:
227 steps.append(
228 WorkflowStep( # type: ignore[operator]
229 step_id="cleanup_cache",
230 name="Cleanup Template Cache",
231 action="cleanup_template_cache",
232 params={},
233 retry_on_failure=False,
234 )
235 )
237 if remove_stale:
238 steps.append(
239 WorkflowStep( # type: ignore[operator]
240 step_id="remove_stale",
241 name="Remove Stale Templates",
242 action="remove_stale_templates",
243 params={"days_threshold": 30},
244 depends_on=["cleanup_cache"] if cleanup_cache else [],
245 retry_on_failure=False,
246 )
247 )
249 if optimize_storage:
250 steps.append(
251 WorkflowStep( # type: ignore[operator]
252 step_id="optimize_storage",
253 name="Optimize Template Storage",
254 action="optimize_template_storage",
255 params={},
256 depends_on=["remove_stale"] if remove_stale else [],
257 retry_on_failure=True,
258 max_retries=2,
259 )
260 )
262 # Create workflow definition
263 workflow = WorkflowDefinition( # type: ignore[operator]
264 workflow_id="template-cleanup",
265 name="Template Cleanup Workflow",
266 description="Remove stale templates and optimize storage",
267 steps=steps,
268 max_execution_time=600, # 10 minutes max
269 )
271 # Execute workflow
272 result = await service._engine.execute(
273 workflow,
274 context={
275 "remove_stale": remove_stale,
276 "optimize_storage": optimize_storage,
277 "cleanup_cache": cleanup_cache,
278 },
279 action_handlers={
280 "cleanup_template_cache": _cleanup_template_cache,
281 "remove_stale_templates": _remove_stale_templates,
282 "optimize_template_storage": _optimize_template_storage,
283 },
284 )
286 return {
287 "workflow_id": workflow.workflow_id,
288 "state": result.state.value
289 if hasattr(result.state, "value")
290 else str(result.state),
291 "completed_at": datetime.now().isoformat(),
292 "steps_completed": len(
293 [s for s in result.step_results.values() if s.state == "completed"]
294 ),
295 "steps_failed": len(
296 [s for s in result.step_results.values() if s.state == "failed"]
297 ),
298 "errors": [s.error for s in result.step_results.values() if s.error],
299 }
302async def execute_performance_optimization(
303 optimize_queries: bool = True,
304 rebuild_indexes: bool = True,
305 cleanup_sessions: bool = True,
306) -> dict[str, t.Any]:
307 """Execute performance optimization workflow.
309 Optimizes database queries, rebuilds indexes, and cleans up sessions.
311 Args:
312 optimize_queries: Analyze and optimize slow queries
313 rebuild_indexes: Rebuild database indexes for optimal performance
314 cleanup_sessions: Clean up expired sessions
316 Returns:
317 Dictionary with workflow results
318 """
319 service = get_workflow_service()
321 if not service.available:
322 # Graceful degradation - manual optimization
323 return await _manual_performance_optimization(
324 optimize_queries, rebuild_indexes, cleanup_sessions
325 )
327 # Define workflow steps
328 steps = []
330 if cleanup_sessions:
331 steps.append(
332 WorkflowStep( # type: ignore[operator]
333 step_id="cleanup_sessions",
334 name="Cleanup Expired Sessions",
335 action="cleanup_expired_sessions",
336 params={"expiry_hours": 24},
337 retry_on_failure=False,
338 )
339 )
341 if optimize_queries:
342 steps.append(
343 WorkflowStep( # type: ignore[operator]
344 step_id="optimize_queries",
345 name="Optimize Database Queries",
346 action="optimize_database_queries",
347 params={},
348 retry_on_failure=True,
349 max_retries=2,
350 )
351 )
353 if rebuild_indexes:
354 steps.append(
355 WorkflowStep( # type: ignore[operator]
356 step_id="rebuild_indexes",
357 name="Rebuild Database Indexes",
358 action="rebuild_database_indexes",
359 params={},
360 depends_on=["optimize_queries"] if optimize_queries else [],
361 retry_on_failure=True,
362 max_retries=1,
363 )
364 )
366 # Create workflow definition
367 workflow = WorkflowDefinition( # type: ignore[operator]
368 workflow_id="performance-optimization",
369 name="Performance Optimization Workflow",
370 description="Optimize database and application performance",
371 steps=steps,
372 max_execution_time=900, # 15 minutes max
373 )
375 # Execute workflow
376 result = await service._engine.execute(
377 workflow,
378 context={
379 "optimize_queries": optimize_queries,
380 "rebuild_indexes": rebuild_indexes,
381 "cleanup_sessions": cleanup_sessions,
382 },
383 action_handlers={
384 "cleanup_expired_sessions": _cleanup_expired_sessions,
385 "optimize_database_queries": _optimize_database_queries,
386 "rebuild_database_indexes": _rebuild_database_indexes,
387 },
388 )
390 return {
391 "workflow_id": workflow.workflow_id,
392 "state": result.state.value
393 if hasattr(result.state, "value")
394 else str(result.state),
395 "completed_at": datetime.now().isoformat(),
396 "steps_completed": len(
397 [s for s in result.step_results.values() if s.state == "completed"]
398 ),
399 "steps_failed": len(
400 [s for s in result.step_results.values() if s.state == "failed"]
401 ),
402 "errors": [s.error for s in result.step_results.values() if s.error],
403 }
406# Action handler implementations
409async def _warm_template_cache(
410 context: dict[str, t.Any], params: dict[str, t.Any]
411) -> dict[str, t.Any]:
412 """Warm template cache by pre-loading commonly used templates."""
413 with suppress(Exception):
414 from .actions.gather import gather
416 # Gather all templates
417 templates_result = await gather.templates()
419 if templates_result and hasattr(templates_result, "templates"):
420 cached_count = 0
421 # Pre-cache template metadata (not full rendering)
422 cache = await depends.get("cache")
423 if cache:
424 for template_name in list(templates_result.templates.keys())[
425 :50
426 ]: # Limit to top 50
427 cache_key = f"template:metadata:{template_name}"
428 await cache.set(
429 cache_key,
430 {
431 "name": template_name,
432 "warmed_at": datetime.now().isoformat(),
433 },
434 ttl=3600,
435 )
436 cached_count += 1
438 return {"templates_warmed": cached_count, "status": "completed"}
440 return {"templates_warmed": 0, "status": "skipped"}
443async def _warm_static_cache(
444 context: dict[str, t.Any], params: dict[str, t.Any]
445) -> dict[str, t.Any]:
446 """Warm static file cache by pre-loading metadata."""
447 # Static file warming would depend on static file adapter
448 return {"static_files_warmed": 0, "status": "skipped"}
451async def _warm_route_cache(
452 context: dict[str, t.Any], params: dict[str, t.Any]
453) -> dict[str, t.Any]:
454 """Warm route cache by pre-loading route definitions."""
455 with suppress(Exception):
456 from .actions.gather import gather
458 # Gather all routes
459 routes_result = await gather.routes()
461 if routes_result and hasattr(routes_result, "routes"):
462 cached_count = len(routes_result.routes)
463 return {"routes_warmed": cached_count, "status": "completed"}
465 return {"routes_warmed": 0, "status": "skipped"}
468async def _cleanup_template_cache(
469 context: dict[str, t.Any], params: dict[str, t.Any]
470) -> dict[str, t.Any]:
471 """Clean up unused template cache entries."""
472 with suppress(Exception):
473 cache = await depends.get("cache")
474 if cache and hasattr(cache, "clear_pattern"):
475 # Clear stale template cache entries
476 await cache.clear_pattern("template:*")
477 return {"cache_cleared": True, "status": "completed"}
479 return {"cache_cleared": False, "status": "skipped"}
482async def _remove_stale_templates(
483 context: dict[str, t.Any], params: dict[str, t.Any]
484) -> dict[str, t.Any]:
485 """Remove templates not accessed in X days."""
486 days_threshold = params.get("days_threshold", 30)
488 # In production, would check template access logs
489 # For now, just return placeholder
490 return {
491 "templates_removed": 0,
492 "days_threshold": days_threshold,
493 "status": "completed",
494 }
497async def _optimize_template_storage(
498 context: dict[str, t.Any], params: dict[str, t.Any]
499) -> dict[str, t.Any]:
500 """Optimize template storage (compress, deduplicate)."""
501 # In production, would compress/optimize template files
502 return {"storage_optimized": False, "status": "skipped"}
505async def _cleanup_expired_sessions(
506 context: dict[str, t.Any], params: dict[str, t.Any]
507) -> dict[str, t.Any]:
508 """Clean up expired sessions."""
509 expiry_hours = params.get("expiry_hours", 24)
511 # In production, would clean up session storage
512 return {
513 "sessions_cleaned": 0,
514 "expiry_hours": expiry_hours,
515 "status": "completed",
516 }
519async def _optimize_database_queries(
520 context: dict[str, t.Any], params: dict[str, t.Any]
521) -> dict[str, t.Any]:
522 """Analyze and optimize slow database queries."""
523 # In production, would analyze query logs and optimize
524 return {"queries_optimized": 0, "status": "skipped"}
527async def _rebuild_database_indexes(
528 context: dict[str, t.Any], params: dict[str, t.Any]
529) -> dict[str, t.Any]:
530 """Rebuild database indexes for optimal performance."""
531 # In production, would rebuild database indexes
532 return {"indexes_rebuilt": 0, "status": "skipped"}
535# Manual fallback implementations (when ACB Workflows unavailable)
538async def _manual_cache_warming(
539 warm_templates: bool, warm_static: bool, warm_routes: bool
540) -> dict[str, t.Any]:
541 """Manual cache warming without workflow orchestration."""
542 results = {}
544 if warm_templates:
545 result = await _warm_template_cache({}, {})
546 results["templates"] = result
548 if warm_static:
549 result = await _warm_static_cache({}, {})
550 results["static"] = result
552 if warm_routes:
553 result = await _warm_route_cache({}, {})
554 results["routes"] = result
556 return {
557 "workflow_id": "cache-warming",
558 "state": "completed",
559 "completed_at": datetime.now().isoformat(),
560 "results": results,
561 "mode": "manual",
562 }
565async def _manual_template_cleanup(
566 remove_stale: bool, optimize_storage: bool, cleanup_cache: bool
567) -> dict[str, t.Any]:
568 """Manual template cleanup without workflow orchestration."""
569 results = {}
571 if cleanup_cache:
572 result = await _cleanup_template_cache({}, {})
573 results["cache_cleanup"] = result
575 if remove_stale:
576 result = await _remove_stale_templates({}, {"days_threshold": 30})
577 results["stale_removal"] = result
579 if optimize_storage:
580 result = await _optimize_template_storage({}, {})
581 results["storage_optimization"] = result
583 return {
584 "workflow_id": "template-cleanup",
585 "state": "completed",
586 "completed_at": datetime.now().isoformat(),
587 "results": results,
588 "mode": "manual",
589 }
592async def _manual_performance_optimization(
593 optimize_queries: bool, rebuild_indexes: bool, cleanup_sessions: bool
594) -> dict[str, t.Any]:
595 """Manual performance optimization without workflow orchestration."""
596 results = {}
598 if cleanup_sessions:
599 result = await _cleanup_expired_sessions({}, {"expiry_hours": 24})
600 results["session_cleanup"] = result
602 if optimize_queries:
603 result = await _optimize_database_queries({}, {})
604 results["query_optimization"] = result
606 if rebuild_indexes:
607 result = await _rebuild_database_indexes({}, {})
608 results["index_rebuild"] = result
610 return {
611 "workflow_id": "performance-optimization",
612 "state": "completed",
613 "completed_at": datetime.now().isoformat(),
614 "results": results,
615 "mode": "manual",
616 }
619async def register_fastblocks_workflows() -> bool:
620 """Register FastBlocks workflows with ACB.
622 Returns:
623 True if registration successful, False otherwise
624 """
625 if not acb_workflows_available:
626 return False
628 try:
629 # Initialize workflow service
630 workflow_service = get_workflow_service()
632 # Register with depends
633 depends.set("fastblocks_workflows", workflow_service)
635 return workflow_service.available
637 except Exception:
638 return False
641__all__ = [
642 "FastBlocksWorkflowService",
643 "get_workflow_service",
644 "execute_cache_warming",
645 "execute_template_cleanup",
646 "execute_performance_optimization",
647 "register_fastblocks_workflows",
648 "acb_workflows_available",
649]