Coverage for fastblocks / _validation_integration.py: 22%
362 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:30 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:30 -0800
1"""ACB ValidationService integration for FastBlocks.
3This module provides validation capabilities using ACB's ValidationService,
4with graceful degradation when ACB validation is not available.
6Author: lesleslie <les@wedgwoodwebworks.com>
7Created: 2025-10-01
9Key Features:
10- Template context validation and sanitization
11- Form input validation with XSS prevention
12- API contract validation for endpoints
13- Graceful degradation when ValidationService unavailable
14- Decorators for automatic validation
15- Custom validation schemas for FastBlocks patterns
17Usage:
18 # Template context validation
19 @validate_template_context
20 async def render_template(request, template, context):
21 ...
23 # Form input validation and sanitization
24 @validate_form_input
25 async def handle_form_submit(request, form_data):
26 ...
28 # API contract validation
29 @validate_api_contract(request_schema=UserCreateSchema, response_schema=UserSchema)
30 async def create_user(request):
31 ...
32"""
34import functools
35import typing as t
36from contextlib import suppress
37from dataclasses import dataclass
38from enum import Enum
40from acb.depends import depends
42# Try to import ACB validation components
43acb_validation_available = False
44ValidationService = None
45ValidationSchema = None
46InputSanitizer = None
47OutputValidator = None
48ValidationResult = None
49ValidationError = None
50validate_input_decorator = None
51validate_output_decorator = None
52sanitize_input_decorator = None
54with suppress(ImportError):
55 from acb.services.validation import ( # type: ignore[no-redef]
56 InputSanitizer,
57 OutputValidator,
58 )
60 acb_validation_available = True
63class ValidationType(str, Enum):
64 """Types of validation performed by FastBlocks."""
66 TEMPLATE_CONTEXT = "template_context"
67 FORM_INPUT = "form_input"
68 API_REQUEST = "api_request"
69 API_RESPONSE = "api_response"
70 ROUTE_PARAMS = "route_params"
71 QUERY_PARAMS = "query_params"
74@dataclass
75class ValidationConfig:
76 """Configuration for FastBlocks validation integration."""
78 # Enable/disable specific validation types
79 validate_templates: bool = True
80 validate_forms: bool = True
81 validate_api: bool = True
83 # Security settings
84 prevent_xss: bool = True
85 prevent_sql_injection: bool = True
86 prevent_path_traversal: bool = True
88 # Performance settings
89 cache_validation_results: bool = True
90 validation_timeout_ms: float = 100.0
92 # Reporting
93 collect_validation_metrics: bool = True
94 log_validation_failures: bool = True
97class FastBlocksValidationService:
98 """FastBlocks wrapper for ACB ValidationService with graceful degradation."""
100 _instance: t.ClassVar["FastBlocksValidationService | None"] = None
101 _config: t.ClassVar[ValidationConfig] = ValidationConfig()
103 def __new__(cls) -> "FastBlocksValidationService":
104 """Singleton pattern - ensure only one instance exists."""
105 if cls._instance is None:
106 cls._instance = super().__new__(cls)
107 return cls._instance
109 def __init__(self) -> None:
110 """Initialize validation service with ACB integration."""
111 if not hasattr(self, "_initialized"):
112 self._service: t.Any = None # ValidationService when ACB available
113 self._sanitizer: t.Any = None # InputSanitizer when ACB available
114 self._validator: t.Any = None # OutputValidator when ACB available
115 self._initialized = True
117 # Try to get ACB ValidationService
118 if acb_validation_available:
119 with suppress(Exception):
120 self._service = depends.get("validation_service")
121 if self._service:
122 self._sanitizer = InputSanitizer() # type: ignore[operator]
123 self._validator = OutputValidator() # type: ignore[operator]
125 @property
126 def available(self) -> bool:
127 """Check if ACB ValidationService is available."""
128 return acb_validation_available and self._service is not None
130 def _sanitize_context_value(
131 self,
132 key: str,
133 value: t.Any,
134 errors: list[str],
135 ) -> t.Any:
136 """Sanitize a single context value.
138 Args:
139 key: Context key
140 value: Context value
141 errors: Error list to append to
143 Returns:
144 Sanitized value
145 """
146 # Skip non-string values (they're safe)
147 if not isinstance(value, str):
148 return value
150 # Sanitize string values for XSS
151 if self._config.prevent_xss and self._sanitizer:
152 try:
153 return self._sanitizer.sanitize_html(value)
154 except Exception as e:
155 errors.append(f"Failed to sanitize {key}: {e}")
156 return value
158 return value
160 def _check_sql_injection_in_context(
161 self,
162 sanitized: dict[str, t.Any],
163 errors: list[str],
164 strict: bool,
165 ) -> bool:
166 """Check for SQL injection patterns in sanitized context.
168 Args:
169 sanitized: Sanitized context data
170 errors: Error list to append to
171 strict: If True, return False immediately on finding issues
173 Returns:
174 False if strict and issues found, True otherwise
175 """
176 if not self._config.prevent_sql_injection:
177 return True
179 for key, value in sanitized.items():
180 if isinstance(value, str) and self._contains_sql_injection(value):
181 errors.append(f"Potential SQL injection in {key}")
182 if strict:
183 return False
185 return True
187 async def validate_template_context(
188 self,
189 context: dict[str, t.Any],
190 template_name: str,
191 strict: bool = False,
192 ) -> tuple[bool, dict[str, t.Any], list[str]]:
193 """Validate and sanitize template context data.
195 Args:
196 context: Template context dictionary
197 template_name: Name of the template being rendered
198 strict: If True, fail on any validation warnings
200 Returns:
201 Tuple of (is_valid, sanitized_context, error_messages)
202 """
203 # Guard clause: skip if validation unavailable or disabled
204 if not self.available or not self._config.validate_templates:
205 return True, context, []
207 errors: list[str] = []
209 try:
210 # Sanitize each context value
211 sanitized = {
212 key: self._sanitize_context_value(key, value, errors)
213 for key, value in context.items()
214 }
216 # Check for SQL injection patterns
217 if not self._check_sql_injection_in_context(sanitized, errors, strict):
218 return False, context, errors
220 # Validation passed (or warnings only)
221 return len(errors) == 0 or not strict, sanitized, errors
223 except Exception as e:
224 errors.append(f"Validation error: {e}")
225 return False, context, errors
227 async def validate_form_input(
228 self,
229 form_data: dict[str, t.Any],
230 schema: dict[str, t.Any] | None = None,
231 strict: bool = True,
232 ) -> tuple[bool, dict[str, t.Any], list[str]]:
233 """Validate and sanitize form input data.
235 Args:
236 form_data: Form data to validate
237 schema: Optional validation schema (field_name -> rules)
238 strict: If True, fail on any validation errors
240 Returns:
241 Tuple of (is_valid, sanitized_data, error_messages)
242 """
243 # Guard clause: skip if validation disabled
244 if not self._config.validate_forms:
245 return True, form_data, []
247 errors: list[str] = []
249 try:
250 # Sanitize and validate fields
251 sanitized = self._sanitize_form_fields(form_data, errors)
253 # Apply schema validation if provided
254 if schema:
255 self._apply_schema_validation(sanitized, schema, errors)
257 return len(errors) == 0, sanitized, errors
259 except Exception as e:
260 errors.append(f"Validation error: {e}")
261 return False, form_data, errors
263 def _sanitize_form_fields(
264 self,
265 form_data: dict[str, t.Any],
266 errors: list[str],
267 ) -> dict[str, t.Any]:
268 """Sanitize all form fields and perform security checks.
270 Args:
271 form_data: Raw form data
272 errors: Error list to append to
274 Returns:
275 Sanitized form data
276 """
277 sanitized = {}
279 for key, value in form_data.items():
280 sanitized[key] = self._sanitize_field(key, value, errors)
282 # Security checks for string values
283 if isinstance(value, str):
284 self._check_security_issues(key, value, errors)
286 return sanitized
288 def _apply_schema_validation(
289 self,
290 sanitized_data: dict[str, t.Any],
291 schema: dict[str, t.Any],
292 errors: list[str],
293 ) -> None:
294 """Apply schema validation rules to sanitized data.
296 Args:
297 sanitized_data: Sanitized form data
298 schema: Validation schema (field_name -> rules)
299 errors: Error list to append to
300 """
301 for field_name, rules in schema.items():
302 value = sanitized_data.get(field_name)
303 self._validate_field_schema(field_name, value, rules, errors)
305 async def validate_api_request(
306 self,
307 request_data: dict[str, t.Any],
308 schema: t.Any = None,
309 ) -> tuple[bool, dict[str, t.Any], list[str]]:
310 """Validate API request data against a schema.
312 Args:
313 request_data: Request data to validate
314 schema: Validation schema (Pydantic, msgspec, etc.)
316 Returns:
317 Tuple of (is_valid, validated_data, error_messages)
318 """
319 # Guard clause: skip if unavailable or disabled
320 if not self.available or not self._config.validate_api:
321 return True, request_data, []
323 errors: list[str] = []
325 try:
326 # Try schema validation first
327 if schema:
328 result = self._validate_with_schema(request_data, schema, errors)
329 if result is not None:
330 return result
332 # Fallback: basic sanitization
333 sanitized = self._sanitize_api_data(request_data)
334 return True, sanitized, []
336 except Exception as e:
337 errors.append(f"Validation error: {e}")
338 return False, request_data, errors
340 def _validate_with_schema(
341 self,
342 data: dict[str, t.Any],
343 schema: t.Any,
344 errors: list[str],
345 ) -> tuple[bool, dict[str, t.Any], list[str]] | None:
346 """Attempt validation with Pydantic or msgspec schema.
348 Args:
349 data: Data to validate
350 schema: Validation schema
351 errors: Error list to append to
353 Returns:
354 Validation result tuple if successful, None to continue with fallback
355 """
356 # Try Pydantic validation
357 if hasattr(schema, "model_validate"):
358 try:
359 validated = schema.model_validate(data)
360 return True, validated.model_dump(), []
361 except Exception as e:
362 errors.append(f"Pydantic validation failed: {e}")
363 return False, data, errors
365 # Try msgspec validation
366 if hasattr(schema, "__struct_fields__"):
367 try:
368 import msgspec
370 validated = msgspec.convert(data, type=schema)
371 return True, msgspec.to_builtins(validated), []
372 except Exception as e:
373 errors.append(f"msgspec validation failed: {e}")
374 return False, data, errors
376 return None
378 def _sanitize_api_data(self, data: dict[str, t.Any]) -> dict[str, t.Any]:
379 """Apply basic XSS sanitization to API data.
381 Args:
382 data: Data to sanitize
384 Returns:
385 Sanitized data
386 """
387 sanitized = {}
388 for key, value in data.items():
389 if isinstance(value, str) and self._config.prevent_xss and self._sanitizer:
390 sanitized[key] = self._sanitizer.sanitize_html(value)
391 else:
392 sanitized[key] = value
393 return sanitized
395 async def validate_api_response(
396 self,
397 response_data: dict[str, t.Any],
398 schema: t.Any = None,
399 ) -> tuple[bool, dict[str, t.Any], list[str]]:
400 """Validate API response data against a schema.
402 Args:
403 response_data: Response data to validate
404 schema: Validation schema (Pydantic, msgspec, etc.)
406 Returns:
407 Tuple of (is_valid, validated_data, error_messages)
408 """
409 # Guard clause: skip if unavailable or disabled
410 if not self.available or not self._config.validate_api:
411 return True, response_data, []
413 errors: list[str] = []
415 try:
416 # Try schema validation if provided
417 if schema:
418 result = self._validate_response_with_schema(
419 response_data, schema, errors
420 )
421 if result is not None:
422 return result
424 return True, response_data, []
426 except Exception as e:
427 errors.append(f"Validation error: {e}")
428 return False, response_data, errors
430 def _validate_response_with_schema(
431 self,
432 data: dict[str, t.Any],
433 schema: t.Any,
434 errors: list[str],
435 ) -> tuple[bool, dict[str, t.Any], list[str]] | None:
436 """Attempt response validation with Pydantic or msgspec schema.
438 Args:
439 data: Response data to validate
440 schema: Validation schema
441 errors: Error list to append to
443 Returns:
444 Validation result tuple if successful, None otherwise
445 """
446 # Try Pydantic validation
447 if hasattr(schema, "model_validate"):
448 try:
449 validated = schema.model_validate(data)
450 return True, validated.model_dump(), []
451 except Exception as e:
452 errors.append(f"Response validation failed: {e}")
453 return False, data, errors
455 # Try msgspec validation
456 if hasattr(schema, "__struct_fields__"):
457 try:
458 import msgspec
460 validated = msgspec.convert(data, type=schema)
461 return True, msgspec.to_builtins(validated), []
462 except Exception as e:
463 errors.append(f"Response validation failed: {e}")
464 return False, data, errors
466 return None
468 def _contains_sql_injection(self, value: str) -> bool:
469 """Check for common SQL injection patterns."""
470 sql_patterns = [
471 "union select",
472 "union all select",
473 "drop table",
474 "delete from",
475 "insert into",
476 "update set",
477 "'; --",
478 "'--",
479 "' or '1'='1",
480 "' or 1=1",
481 '" or "1"="1',
482 "or 1=1",
483 "' or 'x'='x",
484 '" or "x"="x',
485 "admin'--",
486 'admin"--',
487 "') or ('1'='1",
488 '") or ("1"="1',
489 "exec(",
490 "execute(",
491 "xp_cmdshell",
492 "sp_executesql",
493 ]
494 value_lower = value.lower()
495 return any(pattern in value_lower for pattern in sql_patterns)
497 def _contains_path_traversal(self, value: str) -> bool:
498 """Check for path traversal attempts."""
499 traversal_patterns = ["../", "..\\", "%2e%2e", "....//"]
500 return any(pattern in value.lower() for pattern in traversal_patterns)
502 def _sanitize_field(
503 self,
504 key: str,
505 value: t.Any,
506 errors: list[str],
507 ) -> t.Any:
508 """Sanitize a single form field.
510 Args:
511 key: Field name
512 value: Field value
513 errors: Error list to append to
515 Returns:
516 Sanitized value
517 """
518 # Skip non-string values
519 if not isinstance(value, str):
520 return value
522 # Sanitize for XSS (only if ACB available)
523 if self.available and self._config.prevent_xss and self._sanitizer:
524 try:
525 return self._sanitizer.sanitize_html(value)
526 except Exception as e:
527 errors.append(f"Failed to sanitize {key}: {e}")
528 return value
530 return value
532 def _check_security_issues(
533 self,
534 key: str,
535 value: str,
536 errors: list[str],
537 ) -> None:
538 """Check for security issues in form field.
540 Args:
541 key: Field name
542 value: Field value
543 errors: Error list to append to
544 """
545 if not self.available:
546 return
548 # Check for SQL injection
549 if self._config.prevent_sql_injection:
550 if self._contains_sql_injection(value):
551 errors.append(f"Potential SQL injection in {key}")
553 # Check for path traversal
554 if self._config.prevent_path_traversal:
555 if self._contains_path_traversal(value):
556 errors.append(f"Potential path traversal in {key}")
558 def _validate_field_schema(
559 self,
560 field_name: str,
561 value: t.Any,
562 rules: dict[str, t.Any],
563 errors: list[str],
564 ) -> None:
565 """Validate a field against its schema rules.
567 Args:
568 field_name: Field name
569 value: Field value
570 rules: Validation rules
571 errors: Error list to append to
572 """
573 # Check required field
574 if self._check_required_field(field_name, value, rules, errors):
575 return # Field missing, skip other validations
577 # Guard clause: skip if value is None
578 if value is None:
579 return
581 # Apply validation rules
582 self._validate_field_type(field_name, value, rules, errors)
583 self._validate_string_length(field_name, value, rules, errors)
584 self._validate_field_pattern(field_name, value, rules, errors)
586 def _check_required_field(
587 self,
588 field_name: str,
589 value: t.Any,
590 rules: dict[str, t.Any],
591 errors: list[str],
592 ) -> bool:
593 """Check if required field is missing.
595 Args:
596 field_name: Field name
597 value: Field value
598 rules: Validation rules
599 errors: Error list to append to
601 Returns:
602 True if field is required and missing, False otherwise
603 """
604 if not rules.get("required"):
605 return False
607 is_missing = value is None or (isinstance(value, str) and not value.strip())
608 if is_missing:
609 errors.append(f"Required field missing: {field_name}")
611 return is_missing
613 def _validate_field_type(
614 self,
615 field_name: str,
616 value: t.Any,
617 rules: dict[str, t.Any],
618 errors: list[str],
619 ) -> None:
620 """Validate field type.
622 Args:
623 field_name: Field name
624 value: Field value
625 rules: Validation rules
626 errors: Error list to append to
627 """
628 if "type" not in rules:
629 return
631 expected_type = rules["type"]
632 if not isinstance(value, expected_type):
633 errors.append(
634 f"Invalid type for {field_name}: expected {expected_type.__name__}"
635 )
637 def _validate_string_length(
638 self,
639 field_name: str,
640 value: t.Any,
641 rules: dict[str, t.Any],
642 errors: list[str],
643 ) -> None:
644 """Validate string length constraints.
646 Args:
647 field_name: Field name
648 value: Field value
649 rules: Validation rules
650 errors: Error list to append to
651 """
652 if not isinstance(value, str):
653 return
655 if "min_length" in rules and len(value) < rules["min_length"]:
656 errors.append(f"{field_name} too short (min: {rules['min_length']})")
658 if "max_length" in rules and len(value) > rules["max_length"]:
659 errors.append(f"{field_name} too long (max: {rules['max_length']})")
661 def _validate_field_pattern(
662 self,
663 field_name: str,
664 value: t.Any,
665 rules: dict[str, t.Any],
666 errors: list[str],
667 ) -> None:
668 """Validate field against regex pattern.
670 Args:
671 field_name: Field name
672 value: Field value
673 rules: Validation rules
674 errors: Error list to append to
675 """
676 if not value or "pattern" not in rules:
677 return
679 import re
681 if not re.match(
682 rules["pattern"], str(value)
683 ): # REGEX OK: User-provided validation pattern from schema
684 errors.append(f"{field_name} does not match required pattern")
687# Singleton instance
688_validation_service: FastBlocksValidationService | None = None
691def get_validation_service() -> FastBlocksValidationService:
692 """Get the singleton FastBlocksValidationService instance."""
693 global _validation_service
694 if _validation_service is None:
695 _validation_service = FastBlocksValidationService()
696 return _validation_service
699# Decorators for automatic validation
702def _extract_template_context(
703 args: tuple[t.Any, ...], kwargs: dict[str, t.Any]
704) -> tuple[dict[str, t.Any], str]:
705 """Extract context and template name from decorator arguments."""
706 raw_context: t.Any = kwargs.get("context") or (args[3] if len(args) > 3 else {})
707 context: dict[str, t.Any] = raw_context if isinstance(raw_context, dict) else {}
708 template = kwargs.get("template") or (args[2] if len(args) > 2 else "unknown")
709 return context, str(template)
712async def _log_template_validation_errors(
713 errors: list[str],
714 template: str,
715 service: FastBlocksValidationService,
716) -> None:
717 """Log validation errors if configured."""
718 if not (errors and service._config.log_validation_failures):
719 return
721 with suppress(Exception):
722 logger = await depends.get("logger")
723 if logger:
724 logger.warning(
725 f"Template context validation warnings for {template}: {errors}"
726 )
729def _update_context_in_args(
730 args: tuple[t.Any, ...],
731 kwargs: dict[str, t.Any],
732 sanitized_context: dict[str, t.Any],
733) -> tuple[tuple[t.Any, ...], dict[str, t.Any]]:
734 """Update args/kwargs with sanitized context."""
735 if "context" in kwargs:
736 kwargs["context"] = sanitized_context
737 elif len(args) > 3:
738 args = (*args[:3], sanitized_context, *args[4:])
739 return args, kwargs
742def validate_template_context(
743 strict: bool = False,
744) -> t.Callable[
745 [t.Callable[..., t.Awaitable[t.Any]]], t.Callable[..., t.Awaitable[t.Any]]
746]:
747 """Decorator to validate template context before rendering.
749 Refactored to reduce cognitive complexity.
751 Usage:
752 @validate_template_context(strict=False)
753 async def render_template(self, request, template, context):
754 ...
755 """
757 def decorator(
758 func: t.Callable[..., t.Awaitable[t.Any]],
759 ) -> t.Callable[..., t.Awaitable[t.Any]]:
760 @functools.wraps(func)
761 async def wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any:
762 # Extract context and template name
763 context, template = _extract_template_context(args, kwargs)
765 # Skip validation if context is empty
766 if not context:
767 return await func(*args, **kwargs)
769 # Validate context
770 service = get_validation_service()
771 (
772 _is_valid,
773 sanitized_context,
774 errors,
775 ) = await service.validate_template_context(
776 context=context,
777 template_name=template,
778 strict=strict,
779 )
781 # Log validation errors if configured
782 await _log_template_validation_errors(errors, template, service)
784 # Update with sanitized context
785 args, kwargs = _update_context_in_args(args, kwargs, sanitized_context)
787 # Call original function
788 return await func(*args, **kwargs)
790 return wrapper
792 return decorator
795def _extract_form_data(
796 args: tuple[t.Any, ...], kwargs: dict[str, t.Any]
797) -> dict[str, t.Any]:
798 """Extract form data from decorator arguments."""
799 raw_data: t.Any = kwargs.get("form_data") or (args[2] if len(args) > 2 else {})
800 return raw_data if isinstance(raw_data, dict) else {}
803def _update_form_data(
804 args: tuple[t.Any, ...],
805 kwargs: dict[str, t.Any],
806 sanitized_data: dict[str, t.Any],
807) -> tuple[tuple[t.Any, ...], dict[str, t.Any]]:
808 """Update args/kwargs with sanitized form data."""
809 if "form_data" in kwargs:
810 kwargs["form_data"] = sanitized_data
811 elif len(args) > 2:
812 args = (*args[:2], sanitized_data, *args[3:])
813 return args, kwargs
816async def _handle_form_validation_errors(
817 is_valid: bool,
818 errors: list[str],
819 service: FastBlocksValidationService,
820 strict: bool,
821) -> None:
822 """Handle form validation errors (logging or raising exception)."""
823 if not is_valid and strict:
824 from .exceptions import ErrorCategory, FastBlocksException
826 raise FastBlocksException(
827 message=f"Form validation failed: {'; '.join(errors)}",
828 category=ErrorCategory.VALIDATION,
829 status_code=400,
830 )
832 if errors and service._config.log_validation_failures:
833 with suppress(Exception):
834 logger = await depends.get("logger")
835 if logger:
836 logger.warning(f"Form validation errors: {errors}")
839def validate_form_input(
840 schema: dict[str, t.Any] | None = None,
841 strict: bool = True,
842) -> t.Callable[
843 [t.Callable[..., t.Awaitable[t.Any]]], t.Callable[..., t.Awaitable[t.Any]]
844]:
845 """Decorator to validate and sanitize form input.
847 Usage:
848 @validate_form_input(schema={"email": {"required": True, "type": str}})
849 async def handle_form(self, request, form_data):
850 ...
851 """
853 def decorator(
854 func: t.Callable[..., t.Awaitable[t.Any]],
855 ) -> t.Callable[..., t.Awaitable[t.Any]]:
856 @functools.wraps(func)
857 async def wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any:
858 # Extract form data
859 form_data = _extract_form_data(args, kwargs)
861 # Skip validation if no form data
862 if not form_data:
863 return await func(*args, **kwargs)
865 # Validate form data
866 service = get_validation_service()
867 is_valid, sanitized_data, errors = await service.validate_form_input(
868 form_data=form_data,
869 schema=schema,
870 strict=strict,
871 )
873 # Handle validation errors
874 await _handle_form_validation_errors(is_valid, errors, service, strict)
876 # Update with sanitized data
877 args, kwargs = _update_form_data(args, kwargs, sanitized_data)
879 # Call original function
880 return await func(*args, **kwargs)
882 return wrapper
884 return decorator
887def _extract_request_data(
888 args: tuple[t.Any, ...], kwargs: dict[str, t.Any]
889) -> dict[str, t.Any]:
890 """Extract request data from args or kwargs."""
891 raw_data: t.Any = kwargs.get("data") or (args[2] if len(args) > 2 else {})
892 return raw_data if isinstance(raw_data, dict) else {}
895def _update_args_with_data(
896 args: tuple[t.Any, ...],
897 kwargs: dict[str, t.Any],
898 validated_data: dict[str, t.Any],
899) -> tuple[tuple[t.Any, ...], dict[str, t.Any]]:
900 """Update args/kwargs with validated data."""
901 if "data" in kwargs:
902 kwargs["data"] = validated_data
903 elif len(args) > 2:
904 args_list = list(args)
905 args_list[2] = validated_data
906 args = tuple(args_list)
907 return args, kwargs
910async def _validate_request(
911 service: FastBlocksValidationService,
912 request_data: dict[str, t.Any],
913 schema: t.Any,
914) -> dict[str, t.Any]:
915 """Validate request data and raise exception if invalid."""
916 is_valid, validated_data, errors = await service.validate_api_request(
917 request_data=request_data,
918 schema=schema,
919 )
921 if not is_valid:
922 from .exceptions import ErrorCategory, FastBlocksException
924 raise FastBlocksException(
925 message=f"Request validation failed: {'; '.join(errors)}",
926 category=ErrorCategory.VALIDATION,
927 status_code=400,
928 )
930 return validated_data
933async def _validate_response(
934 service: FastBlocksValidationService,
935 response_data: dict[str, t.Any],
936 schema: t.Any,
937) -> dict[str, t.Any]:
938 """Validate response data and log errors if invalid."""
939 is_valid, validated_response, errors = await service.validate_api_response(
940 response_data=response_data,
941 schema=schema,
942 )
944 if not is_valid:
945 with suppress(Exception):
946 logger = await depends.get("logger")
947 if logger:
948 logger.error(f"Response validation failed: {errors}")
950 return validated_response
953def validate_api_contract(
954 request_schema: t.Any = None,
955 response_schema: t.Any = None,
956) -> t.Callable[
957 [t.Callable[..., t.Awaitable[t.Any]]], t.Callable[..., t.Awaitable[t.Any]]
958]:
959 """Decorator to validate API request/response contracts.
961 Usage:
962 @validate_api_contract(
963 request_schema=UserCreateSchema,
964 response_schema=UserSchema
965 )
966 async def create_user(self, request, data):
967 ...
968 """
970 def decorator(
971 func: t.Callable[..., t.Awaitable[t.Any]],
972 ) -> t.Callable[..., t.Awaitable[t.Any]]:
973 @functools.wraps(func)
974 async def wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any:
975 service = get_validation_service()
977 # Validate request if schema provided
978 if request_schema:
979 request_data = _extract_request_data(args, kwargs)
980 if request_data:
981 validated_data = await _validate_request(
982 service, request_data, request_schema
983 )
984 args, kwargs = _update_args_with_data(args, kwargs, validated_data)
986 # Call original function
987 result = await func(*args, **kwargs)
989 # Validate response if schema provided
990 if response_schema and isinstance(result, dict):
991 return await _validate_response(service, result, response_schema)
993 return result
995 return wrapper
997 return decorator
1000async def register_fastblocks_validation() -> bool:
1001 """Register FastBlocks validation service with ACB.
1003 Returns:
1004 True if registration successful, False otherwise
1005 """
1006 if not acb_validation_available:
1007 return False
1009 try:
1010 # Initialize validation service
1011 validation_service = get_validation_service()
1013 # Register with depends
1014 depends.set("fastblocks_validation", validation_service)
1016 return validation_service.available
1018 except Exception:
1019 return False
1022__all__ = [
1023 "FastBlocksValidationService",
1024 "ValidationConfig",
1025 "ValidationType",
1026 "get_validation_service",
1027 "validate_template_context",
1028 "validate_form_input",
1029 "validate_api_contract",
1030 "register_fastblocks_validation",
1031 "acb_validation_available",
1032]