Coverage for fastblocks / _validation_integration.py: 22%

362 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:30 -0800

1"""ACB ValidationService integration for FastBlocks. 

2 

3This module provides validation capabilities using ACB's ValidationService, 

4with graceful degradation when ACB validation is not available. 

5 

6Author: lesleslie <les@wedgwoodwebworks.com> 

7Created: 2025-10-01 

8 

9Key Features: 

10- Template context validation and sanitization 

11- Form input validation with XSS prevention 

12- API contract validation for endpoints 

13- Graceful degradation when ValidationService unavailable 

14- Decorators for automatic validation 

15- Custom validation schemas for FastBlocks patterns 

16 

17Usage: 

18 # Template context validation 

19 @validate_template_context 

20 async def render_template(request, template, context): 

21 ... 

22 

23 # Form input validation and sanitization 

24 @validate_form_input 

25 async def handle_form_submit(request, form_data): 

26 ... 

27 

28 # API contract validation 

29 @validate_api_contract(request_schema=UserCreateSchema, response_schema=UserSchema) 

30 async def create_user(request): 

31 ... 

32""" 

33 

34import functools 

35import typing as t 

36from contextlib import suppress 

37from dataclasses import dataclass 

38from enum import Enum 

39 

40from acb.depends import depends 

41 

42# Try to import ACB validation components 

43acb_validation_available = False 

44ValidationService = None 

45ValidationSchema = None 

46InputSanitizer = None 

47OutputValidator = None 

48ValidationResult = None 

49ValidationError = None 

50validate_input_decorator = None 

51validate_output_decorator = None 

52sanitize_input_decorator = None 

53 

54with suppress(ImportError): 

55 from acb.services.validation import ( # type: ignore[no-redef] 

56 InputSanitizer, 

57 OutputValidator, 

58 ) 

59 

60 acb_validation_available = True 

61 

62 

63class ValidationType(str, Enum): 

64 """Types of validation performed by FastBlocks.""" 

65 

66 TEMPLATE_CONTEXT = "template_context" 

67 FORM_INPUT = "form_input" 

68 API_REQUEST = "api_request" 

69 API_RESPONSE = "api_response" 

70 ROUTE_PARAMS = "route_params" 

71 QUERY_PARAMS = "query_params" 

72 

73 

74@dataclass 

75class ValidationConfig: 

76 """Configuration for FastBlocks validation integration.""" 

77 

78 # Enable/disable specific validation types 

79 validate_templates: bool = True 

80 validate_forms: bool = True 

81 validate_api: bool = True 

82 

83 # Security settings 

84 prevent_xss: bool = True 

85 prevent_sql_injection: bool = True 

86 prevent_path_traversal: bool = True 

87 

88 # Performance settings 

89 cache_validation_results: bool = True 

90 validation_timeout_ms: float = 100.0 

91 

92 # Reporting 

93 collect_validation_metrics: bool = True 

94 log_validation_failures: bool = True 

95 

96 

97class FastBlocksValidationService: 

98 """FastBlocks wrapper for ACB ValidationService with graceful degradation.""" 

99 

100 _instance: t.ClassVar["FastBlocksValidationService | None"] = None 

101 _config: t.ClassVar[ValidationConfig] = ValidationConfig() 

102 

103 def __new__(cls) -> "FastBlocksValidationService": 

104 """Singleton pattern - ensure only one instance exists.""" 

105 if cls._instance is None: 

106 cls._instance = super().__new__(cls) 

107 return cls._instance 

108 

109 def __init__(self) -> None: 

110 """Initialize validation service with ACB integration.""" 

111 if not hasattr(self, "_initialized"): 

112 self._service: t.Any = None # ValidationService when ACB available 

113 self._sanitizer: t.Any = None # InputSanitizer when ACB available 

114 self._validator: t.Any = None # OutputValidator when ACB available 

115 self._initialized = True 

116 

117 # Try to get ACB ValidationService 

118 if acb_validation_available: 

119 with suppress(Exception): 

120 self._service = depends.get("validation_service") 

121 if self._service: 

122 self._sanitizer = InputSanitizer() # type: ignore[operator] 

123 self._validator = OutputValidator() # type: ignore[operator] 

124 

125 @property 

126 def available(self) -> bool: 

127 """Check if ACB ValidationService is available.""" 

128 return acb_validation_available and self._service is not None 

129 

130 def _sanitize_context_value( 

131 self, 

132 key: str, 

133 value: t.Any, 

134 errors: list[str], 

135 ) -> t.Any: 

136 """Sanitize a single context value. 

137 

138 Args: 

139 key: Context key 

140 value: Context value 

141 errors: Error list to append to 

142 

143 Returns: 

144 Sanitized value 

145 """ 

146 # Skip non-string values (they're safe) 

147 if not isinstance(value, str): 

148 return value 

149 

150 # Sanitize string values for XSS 

151 if self._config.prevent_xss and self._sanitizer: 

152 try: 

153 return self._sanitizer.sanitize_html(value) 

154 except Exception as e: 

155 errors.append(f"Failed to sanitize {key}: {e}") 

156 return value 

157 

158 return value 

159 

160 def _check_sql_injection_in_context( 

161 self, 

162 sanitized: dict[str, t.Any], 

163 errors: list[str], 

164 strict: bool, 

165 ) -> bool: 

166 """Check for SQL injection patterns in sanitized context. 

167 

168 Args: 

169 sanitized: Sanitized context data 

170 errors: Error list to append to 

171 strict: If True, return False immediately on finding issues 

172 

173 Returns: 

174 False if strict and issues found, True otherwise 

175 """ 

176 if not self._config.prevent_sql_injection: 

177 return True 

178 

179 for key, value in sanitized.items(): 

180 if isinstance(value, str) and self._contains_sql_injection(value): 

181 errors.append(f"Potential SQL injection in {key}") 

182 if strict: 

183 return False 

184 

185 return True 

186 

187 async def validate_template_context( 

188 self, 

189 context: dict[str, t.Any], 

190 template_name: str, 

191 strict: bool = False, 

192 ) -> tuple[bool, dict[str, t.Any], list[str]]: 

193 """Validate and sanitize template context data. 

194 

195 Args: 

196 context: Template context dictionary 

197 template_name: Name of the template being rendered 

198 strict: If True, fail on any validation warnings 

199 

200 Returns: 

201 Tuple of (is_valid, sanitized_context, error_messages) 

202 """ 

203 # Guard clause: skip if validation unavailable or disabled 

204 if not self.available or not self._config.validate_templates: 

205 return True, context, [] 

206 

207 errors: list[str] = [] 

208 

209 try: 

210 # Sanitize each context value 

211 sanitized = { 

212 key: self._sanitize_context_value(key, value, errors) 

213 for key, value in context.items() 

214 } 

215 

216 # Check for SQL injection patterns 

217 if not self._check_sql_injection_in_context(sanitized, errors, strict): 

218 return False, context, errors 

219 

220 # Validation passed (or warnings only) 

221 return len(errors) == 0 or not strict, sanitized, errors 

222 

223 except Exception as e: 

224 errors.append(f"Validation error: {e}") 

225 return False, context, errors 

226 

227 async def validate_form_input( 

228 self, 

229 form_data: dict[str, t.Any], 

230 schema: dict[str, t.Any] | None = None, 

231 strict: bool = True, 

232 ) -> tuple[bool, dict[str, t.Any], list[str]]: 

233 """Validate and sanitize form input data. 

234 

235 Args: 

236 form_data: Form data to validate 

237 schema: Optional validation schema (field_name -> rules) 

238 strict: If True, fail on any validation errors 

239 

240 Returns: 

241 Tuple of (is_valid, sanitized_data, error_messages) 

242 """ 

243 # Guard clause: skip if validation disabled 

244 if not self._config.validate_forms: 

245 return True, form_data, [] 

246 

247 errors: list[str] = [] 

248 

249 try: 

250 # Sanitize and validate fields 

251 sanitized = self._sanitize_form_fields(form_data, errors) 

252 

253 # Apply schema validation if provided 

254 if schema: 

255 self._apply_schema_validation(sanitized, schema, errors) 

256 

257 return len(errors) == 0, sanitized, errors 

258 

259 except Exception as e: 

260 errors.append(f"Validation error: {e}") 

261 return False, form_data, errors 

262 

263 def _sanitize_form_fields( 

264 self, 

265 form_data: dict[str, t.Any], 

266 errors: list[str], 

267 ) -> dict[str, t.Any]: 

268 """Sanitize all form fields and perform security checks. 

269 

270 Args: 

271 form_data: Raw form data 

272 errors: Error list to append to 

273 

274 Returns: 

275 Sanitized form data 

276 """ 

277 sanitized = {} 

278 

279 for key, value in form_data.items(): 

280 sanitized[key] = self._sanitize_field(key, value, errors) 

281 

282 # Security checks for string values 

283 if isinstance(value, str): 

284 self._check_security_issues(key, value, errors) 

285 

286 return sanitized 

287 

288 def _apply_schema_validation( 

289 self, 

290 sanitized_data: dict[str, t.Any], 

291 schema: dict[str, t.Any], 

292 errors: list[str], 

293 ) -> None: 

294 """Apply schema validation rules to sanitized data. 

295 

296 Args: 

297 sanitized_data: Sanitized form data 

298 schema: Validation schema (field_name -> rules) 

299 errors: Error list to append to 

300 """ 

301 for field_name, rules in schema.items(): 

302 value = sanitized_data.get(field_name) 

303 self._validate_field_schema(field_name, value, rules, errors) 

304 

305 async def validate_api_request( 

306 self, 

307 request_data: dict[str, t.Any], 

308 schema: t.Any = None, 

309 ) -> tuple[bool, dict[str, t.Any], list[str]]: 

310 """Validate API request data against a schema. 

311 

312 Args: 

313 request_data: Request data to validate 

314 schema: Validation schema (Pydantic, msgspec, etc.) 

315 

316 Returns: 

317 Tuple of (is_valid, validated_data, error_messages) 

318 """ 

319 # Guard clause: skip if unavailable or disabled 

320 if not self.available or not self._config.validate_api: 

321 return True, request_data, [] 

322 

323 errors: list[str] = [] 

324 

325 try: 

326 # Try schema validation first 

327 if schema: 

328 result = self._validate_with_schema(request_data, schema, errors) 

329 if result is not None: 

330 return result 

331 

332 # Fallback: basic sanitization 

333 sanitized = self._sanitize_api_data(request_data) 

334 return True, sanitized, [] 

335 

336 except Exception as e: 

337 errors.append(f"Validation error: {e}") 

338 return False, request_data, errors 

339 

340 def _validate_with_schema( 

341 self, 

342 data: dict[str, t.Any], 

343 schema: t.Any, 

344 errors: list[str], 

345 ) -> tuple[bool, dict[str, t.Any], list[str]] | None: 

346 """Attempt validation with Pydantic or msgspec schema. 

347 

348 Args: 

349 data: Data to validate 

350 schema: Validation schema 

351 errors: Error list to append to 

352 

353 Returns: 

354 Validation result tuple if successful, None to continue with fallback 

355 """ 

356 # Try Pydantic validation 

357 if hasattr(schema, "model_validate"): 

358 try: 

359 validated = schema.model_validate(data) 

360 return True, validated.model_dump(), [] 

361 except Exception as e: 

362 errors.append(f"Pydantic validation failed: {e}") 

363 return False, data, errors 

364 

365 # Try msgspec validation 

366 if hasattr(schema, "__struct_fields__"): 

367 try: 

368 import msgspec 

369 

370 validated = msgspec.convert(data, type=schema) 

371 return True, msgspec.to_builtins(validated), [] 

372 except Exception as e: 

373 errors.append(f"msgspec validation failed: {e}") 

374 return False, data, errors 

375 

376 return None 

377 

378 def _sanitize_api_data(self, data: dict[str, t.Any]) -> dict[str, t.Any]: 

379 """Apply basic XSS sanitization to API data. 

380 

381 Args: 

382 data: Data to sanitize 

383 

384 Returns: 

385 Sanitized data 

386 """ 

387 sanitized = {} 

388 for key, value in data.items(): 

389 if isinstance(value, str) and self._config.prevent_xss and self._sanitizer: 

390 sanitized[key] = self._sanitizer.sanitize_html(value) 

391 else: 

392 sanitized[key] = value 

393 return sanitized 

394 

395 async def validate_api_response( 

396 self, 

397 response_data: dict[str, t.Any], 

398 schema: t.Any = None, 

399 ) -> tuple[bool, dict[str, t.Any], list[str]]: 

400 """Validate API response data against a schema. 

401 

402 Args: 

403 response_data: Response data to validate 

404 schema: Validation schema (Pydantic, msgspec, etc.) 

405 

406 Returns: 

407 Tuple of (is_valid, validated_data, error_messages) 

408 """ 

409 # Guard clause: skip if unavailable or disabled 

410 if not self.available or not self._config.validate_api: 

411 return True, response_data, [] 

412 

413 errors: list[str] = [] 

414 

415 try: 

416 # Try schema validation if provided 

417 if schema: 

418 result = self._validate_response_with_schema( 

419 response_data, schema, errors 

420 ) 

421 if result is not None: 

422 return result 

423 

424 return True, response_data, [] 

425 

426 except Exception as e: 

427 errors.append(f"Validation error: {e}") 

428 return False, response_data, errors 

429 

430 def _validate_response_with_schema( 

431 self, 

432 data: dict[str, t.Any], 

433 schema: t.Any, 

434 errors: list[str], 

435 ) -> tuple[bool, dict[str, t.Any], list[str]] | None: 

436 """Attempt response validation with Pydantic or msgspec schema. 

437 

438 Args: 

439 data: Response data to validate 

440 schema: Validation schema 

441 errors: Error list to append to 

442 

443 Returns: 

444 Validation result tuple if successful, None otherwise 

445 """ 

446 # Try Pydantic validation 

447 if hasattr(schema, "model_validate"): 

448 try: 

449 validated = schema.model_validate(data) 

450 return True, validated.model_dump(), [] 

451 except Exception as e: 

452 errors.append(f"Response validation failed: {e}") 

453 return False, data, errors 

454 

455 # Try msgspec validation 

456 if hasattr(schema, "__struct_fields__"): 

457 try: 

458 import msgspec 

459 

460 validated = msgspec.convert(data, type=schema) 

461 return True, msgspec.to_builtins(validated), [] 

462 except Exception as e: 

463 errors.append(f"Response validation failed: {e}") 

464 return False, data, errors 

465 

466 return None 

467 

468 def _contains_sql_injection(self, value: str) -> bool: 

469 """Check for common SQL injection patterns.""" 

470 sql_patterns = [ 

471 "union select", 

472 "union all select", 

473 "drop table", 

474 "delete from", 

475 "insert into", 

476 "update set", 

477 "'; --", 

478 "'--", 

479 "' or '1'='1", 

480 "' or 1=1", 

481 '" or "1"="1', 

482 "or 1=1", 

483 "' or 'x'='x", 

484 '" or "x"="x', 

485 "admin'--", 

486 'admin"--', 

487 "') or ('1'='1", 

488 '") or ("1"="1', 

489 "exec(", 

490 "execute(", 

491 "xp_cmdshell", 

492 "sp_executesql", 

493 ] 

494 value_lower = value.lower() 

495 return any(pattern in value_lower for pattern in sql_patterns) 

496 

497 def _contains_path_traversal(self, value: str) -> bool: 

498 """Check for path traversal attempts.""" 

499 traversal_patterns = ["../", "..\\", "%2e%2e", "....//"] 

500 return any(pattern in value.lower() for pattern in traversal_patterns) 

501 

502 def _sanitize_field( 

503 self, 

504 key: str, 

505 value: t.Any, 

506 errors: list[str], 

507 ) -> t.Any: 

508 """Sanitize a single form field. 

509 

510 Args: 

511 key: Field name 

512 value: Field value 

513 errors: Error list to append to 

514 

515 Returns: 

516 Sanitized value 

517 """ 

518 # Skip non-string values 

519 if not isinstance(value, str): 

520 return value 

521 

522 # Sanitize for XSS (only if ACB available) 

523 if self.available and self._config.prevent_xss and self._sanitizer: 

524 try: 

525 return self._sanitizer.sanitize_html(value) 

526 except Exception as e: 

527 errors.append(f"Failed to sanitize {key}: {e}") 

528 return value 

529 

530 return value 

531 

532 def _check_security_issues( 

533 self, 

534 key: str, 

535 value: str, 

536 errors: list[str], 

537 ) -> None: 

538 """Check for security issues in form field. 

539 

540 Args: 

541 key: Field name 

542 value: Field value 

543 errors: Error list to append to 

544 """ 

545 if not self.available: 

546 return 

547 

548 # Check for SQL injection 

549 if self._config.prevent_sql_injection: 

550 if self._contains_sql_injection(value): 

551 errors.append(f"Potential SQL injection in {key}") 

552 

553 # Check for path traversal 

554 if self._config.prevent_path_traversal: 

555 if self._contains_path_traversal(value): 

556 errors.append(f"Potential path traversal in {key}") 

557 

558 def _validate_field_schema( 

559 self, 

560 field_name: str, 

561 value: t.Any, 

562 rules: dict[str, t.Any], 

563 errors: list[str], 

564 ) -> None: 

565 """Validate a field against its schema rules. 

566 

567 Args: 

568 field_name: Field name 

569 value: Field value 

570 rules: Validation rules 

571 errors: Error list to append to 

572 """ 

573 # Check required field 

574 if self._check_required_field(field_name, value, rules, errors): 

575 return # Field missing, skip other validations 

576 

577 # Guard clause: skip if value is None 

578 if value is None: 

579 return 

580 

581 # Apply validation rules 

582 self._validate_field_type(field_name, value, rules, errors) 

583 self._validate_string_length(field_name, value, rules, errors) 

584 self._validate_field_pattern(field_name, value, rules, errors) 

585 

586 def _check_required_field( 

587 self, 

588 field_name: str, 

589 value: t.Any, 

590 rules: dict[str, t.Any], 

591 errors: list[str], 

592 ) -> bool: 

593 """Check if required field is missing. 

594 

595 Args: 

596 field_name: Field name 

597 value: Field value 

598 rules: Validation rules 

599 errors: Error list to append to 

600 

601 Returns: 

602 True if field is required and missing, False otherwise 

603 """ 

604 if not rules.get("required"): 

605 return False 

606 

607 is_missing = value is None or (isinstance(value, str) and not value.strip()) 

608 if is_missing: 

609 errors.append(f"Required field missing: {field_name}") 

610 

611 return is_missing 

612 

613 def _validate_field_type( 

614 self, 

615 field_name: str, 

616 value: t.Any, 

617 rules: dict[str, t.Any], 

618 errors: list[str], 

619 ) -> None: 

620 """Validate field type. 

621 

622 Args: 

623 field_name: Field name 

624 value: Field value 

625 rules: Validation rules 

626 errors: Error list to append to 

627 """ 

628 if "type" not in rules: 

629 return 

630 

631 expected_type = rules["type"] 

632 if not isinstance(value, expected_type): 

633 errors.append( 

634 f"Invalid type for {field_name}: expected {expected_type.__name__}" 

635 ) 

636 

637 def _validate_string_length( 

638 self, 

639 field_name: str, 

640 value: t.Any, 

641 rules: dict[str, t.Any], 

642 errors: list[str], 

643 ) -> None: 

644 """Validate string length constraints. 

645 

646 Args: 

647 field_name: Field name 

648 value: Field value 

649 rules: Validation rules 

650 errors: Error list to append to 

651 """ 

652 if not isinstance(value, str): 

653 return 

654 

655 if "min_length" in rules and len(value) < rules["min_length"]: 

656 errors.append(f"{field_name} too short (min: {rules['min_length']})") 

657 

658 if "max_length" in rules and len(value) > rules["max_length"]: 

659 errors.append(f"{field_name} too long (max: {rules['max_length']})") 

660 

661 def _validate_field_pattern( 

662 self, 

663 field_name: str, 

664 value: t.Any, 

665 rules: dict[str, t.Any], 

666 errors: list[str], 

667 ) -> None: 

668 """Validate field against regex pattern. 

669 

670 Args: 

671 field_name: Field name 

672 value: Field value 

673 rules: Validation rules 

674 errors: Error list to append to 

675 """ 

676 if not value or "pattern" not in rules: 

677 return 

678 

679 import re 

680 

681 if not re.match( 

682 rules["pattern"], str(value) 

683 ): # REGEX OK: User-provided validation pattern from schema 

684 errors.append(f"{field_name} does not match required pattern") 

685 

686 

687# Singleton instance 

688_validation_service: FastBlocksValidationService | None = None 

689 

690 

691def get_validation_service() -> FastBlocksValidationService: 

692 """Get the singleton FastBlocksValidationService instance.""" 

693 global _validation_service 

694 if _validation_service is None: 

695 _validation_service = FastBlocksValidationService() 

696 return _validation_service 

697 

698 

699# Decorators for automatic validation 

700 

701 

702def _extract_template_context( 

703 args: tuple[t.Any, ...], kwargs: dict[str, t.Any] 

704) -> tuple[dict[str, t.Any], str]: 

705 """Extract context and template name from decorator arguments.""" 

706 raw_context: t.Any = kwargs.get("context") or (args[3] if len(args) > 3 else {}) 

707 context: dict[str, t.Any] = raw_context if isinstance(raw_context, dict) else {} 

708 template = kwargs.get("template") or (args[2] if len(args) > 2 else "unknown") 

709 return context, str(template) 

710 

711 

712async def _log_template_validation_errors( 

713 errors: list[str], 

714 template: str, 

715 service: FastBlocksValidationService, 

716) -> None: 

717 """Log validation errors if configured.""" 

718 if not (errors and service._config.log_validation_failures): 

719 return 

720 

721 with suppress(Exception): 

722 logger = await depends.get("logger") 

723 if logger: 

724 logger.warning( 

725 f"Template context validation warnings for {template}: {errors}" 

726 ) 

727 

728 

729def _update_context_in_args( 

730 args: tuple[t.Any, ...], 

731 kwargs: dict[str, t.Any], 

732 sanitized_context: dict[str, t.Any], 

733) -> tuple[tuple[t.Any, ...], dict[str, t.Any]]: 

734 """Update args/kwargs with sanitized context.""" 

735 if "context" in kwargs: 

736 kwargs["context"] = sanitized_context 

737 elif len(args) > 3: 

738 args = (*args[:3], sanitized_context, *args[4:]) 

739 return args, kwargs 

740 

741 

742def validate_template_context( 

743 strict: bool = False, 

744) -> t.Callable[ 

745 [t.Callable[..., t.Awaitable[t.Any]]], t.Callable[..., t.Awaitable[t.Any]] 

746]: 

747 """Decorator to validate template context before rendering. 

748 

749 Refactored to reduce cognitive complexity. 

750 

751 Usage: 

752 @validate_template_context(strict=False) 

753 async def render_template(self, request, template, context): 

754 ... 

755 """ 

756 

757 def decorator( 

758 func: t.Callable[..., t.Awaitable[t.Any]], 

759 ) -> t.Callable[..., t.Awaitable[t.Any]]: 

760 @functools.wraps(func) 

761 async def wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: 

762 # Extract context and template name 

763 context, template = _extract_template_context(args, kwargs) 

764 

765 # Skip validation if context is empty 

766 if not context: 

767 return await func(*args, **kwargs) 

768 

769 # Validate context 

770 service = get_validation_service() 

771 ( 

772 _is_valid, 

773 sanitized_context, 

774 errors, 

775 ) = await service.validate_template_context( 

776 context=context, 

777 template_name=template, 

778 strict=strict, 

779 ) 

780 

781 # Log validation errors if configured 

782 await _log_template_validation_errors(errors, template, service) 

783 

784 # Update with sanitized context 

785 args, kwargs = _update_context_in_args(args, kwargs, sanitized_context) 

786 

787 # Call original function 

788 return await func(*args, **kwargs) 

789 

790 return wrapper 

791 

792 return decorator 

793 

794 

795def _extract_form_data( 

796 args: tuple[t.Any, ...], kwargs: dict[str, t.Any] 

797) -> dict[str, t.Any]: 

798 """Extract form data from decorator arguments.""" 

799 raw_data: t.Any = kwargs.get("form_data") or (args[2] if len(args) > 2 else {}) 

800 return raw_data if isinstance(raw_data, dict) else {} 

801 

802 

803def _update_form_data( 

804 args: tuple[t.Any, ...], 

805 kwargs: dict[str, t.Any], 

806 sanitized_data: dict[str, t.Any], 

807) -> tuple[tuple[t.Any, ...], dict[str, t.Any]]: 

808 """Update args/kwargs with sanitized form data.""" 

809 if "form_data" in kwargs: 

810 kwargs["form_data"] = sanitized_data 

811 elif len(args) > 2: 

812 args = (*args[:2], sanitized_data, *args[3:]) 

813 return args, kwargs 

814 

815 

816async def _handle_form_validation_errors( 

817 is_valid: bool, 

818 errors: list[str], 

819 service: FastBlocksValidationService, 

820 strict: bool, 

821) -> None: 

822 """Handle form validation errors (logging or raising exception).""" 

823 if not is_valid and strict: 

824 from .exceptions import ErrorCategory, FastBlocksException 

825 

826 raise FastBlocksException( 

827 message=f"Form validation failed: {'; '.join(errors)}", 

828 category=ErrorCategory.VALIDATION, 

829 status_code=400, 

830 ) 

831 

832 if errors and service._config.log_validation_failures: 

833 with suppress(Exception): 

834 logger = await depends.get("logger") 

835 if logger: 

836 logger.warning(f"Form validation errors: {errors}") 

837 

838 

839def validate_form_input( 

840 schema: dict[str, t.Any] | None = None, 

841 strict: bool = True, 

842) -> t.Callable[ 

843 [t.Callable[..., t.Awaitable[t.Any]]], t.Callable[..., t.Awaitable[t.Any]] 

844]: 

845 """Decorator to validate and sanitize form input. 

846 

847 Usage: 

848 @validate_form_input(schema={"email": {"required": True, "type": str}}) 

849 async def handle_form(self, request, form_data): 

850 ... 

851 """ 

852 

853 def decorator( 

854 func: t.Callable[..., t.Awaitable[t.Any]], 

855 ) -> t.Callable[..., t.Awaitable[t.Any]]: 

856 @functools.wraps(func) 

857 async def wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: 

858 # Extract form data 

859 form_data = _extract_form_data(args, kwargs) 

860 

861 # Skip validation if no form data 

862 if not form_data: 

863 return await func(*args, **kwargs) 

864 

865 # Validate form data 

866 service = get_validation_service() 

867 is_valid, sanitized_data, errors = await service.validate_form_input( 

868 form_data=form_data, 

869 schema=schema, 

870 strict=strict, 

871 ) 

872 

873 # Handle validation errors 

874 await _handle_form_validation_errors(is_valid, errors, service, strict) 

875 

876 # Update with sanitized data 

877 args, kwargs = _update_form_data(args, kwargs, sanitized_data) 

878 

879 # Call original function 

880 return await func(*args, **kwargs) 

881 

882 return wrapper 

883 

884 return decorator 

885 

886 

887def _extract_request_data( 

888 args: tuple[t.Any, ...], kwargs: dict[str, t.Any] 

889) -> dict[str, t.Any]: 

890 """Extract request data from args or kwargs.""" 

891 raw_data: t.Any = kwargs.get("data") or (args[2] if len(args) > 2 else {}) 

892 return raw_data if isinstance(raw_data, dict) else {} 

893 

894 

895def _update_args_with_data( 

896 args: tuple[t.Any, ...], 

897 kwargs: dict[str, t.Any], 

898 validated_data: dict[str, t.Any], 

899) -> tuple[tuple[t.Any, ...], dict[str, t.Any]]: 

900 """Update args/kwargs with validated data.""" 

901 if "data" in kwargs: 

902 kwargs["data"] = validated_data 

903 elif len(args) > 2: 

904 args_list = list(args) 

905 args_list[2] = validated_data 

906 args = tuple(args_list) 

907 return args, kwargs 

908 

909 

910async def _validate_request( 

911 service: FastBlocksValidationService, 

912 request_data: dict[str, t.Any], 

913 schema: t.Any, 

914) -> dict[str, t.Any]: 

915 """Validate request data and raise exception if invalid.""" 

916 is_valid, validated_data, errors = await service.validate_api_request( 

917 request_data=request_data, 

918 schema=schema, 

919 ) 

920 

921 if not is_valid: 

922 from .exceptions import ErrorCategory, FastBlocksException 

923 

924 raise FastBlocksException( 

925 message=f"Request validation failed: {'; '.join(errors)}", 

926 category=ErrorCategory.VALIDATION, 

927 status_code=400, 

928 ) 

929 

930 return validated_data 

931 

932 

933async def _validate_response( 

934 service: FastBlocksValidationService, 

935 response_data: dict[str, t.Any], 

936 schema: t.Any, 

937) -> dict[str, t.Any]: 

938 """Validate response data and log errors if invalid.""" 

939 is_valid, validated_response, errors = await service.validate_api_response( 

940 response_data=response_data, 

941 schema=schema, 

942 ) 

943 

944 if not is_valid: 

945 with suppress(Exception): 

946 logger = await depends.get("logger") 

947 if logger: 

948 logger.error(f"Response validation failed: {errors}") 

949 

950 return validated_response 

951 

952 

953def validate_api_contract( 

954 request_schema: t.Any = None, 

955 response_schema: t.Any = None, 

956) -> t.Callable[ 

957 [t.Callable[..., t.Awaitable[t.Any]]], t.Callable[..., t.Awaitable[t.Any]] 

958]: 

959 """Decorator to validate API request/response contracts. 

960 

961 Usage: 

962 @validate_api_contract( 

963 request_schema=UserCreateSchema, 

964 response_schema=UserSchema 

965 ) 

966 async def create_user(self, request, data): 

967 ... 

968 """ 

969 

970 def decorator( 

971 func: t.Callable[..., t.Awaitable[t.Any]], 

972 ) -> t.Callable[..., t.Awaitable[t.Any]]: 

973 @functools.wraps(func) 

974 async def wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: 

975 service = get_validation_service() 

976 

977 # Validate request if schema provided 

978 if request_schema: 

979 request_data = _extract_request_data(args, kwargs) 

980 if request_data: 

981 validated_data = await _validate_request( 

982 service, request_data, request_schema 

983 ) 

984 args, kwargs = _update_args_with_data(args, kwargs, validated_data) 

985 

986 # Call original function 

987 result = await func(*args, **kwargs) 

988 

989 # Validate response if schema provided 

990 if response_schema and isinstance(result, dict): 

991 return await _validate_response(service, result, response_schema) 

992 

993 return result 

994 

995 return wrapper 

996 

997 return decorator 

998 

999 

1000async def register_fastblocks_validation() -> bool: 

1001 """Register FastBlocks validation service with ACB. 

1002 

1003 Returns: 

1004 True if registration successful, False otherwise 

1005 """ 

1006 if not acb_validation_available: 

1007 return False 

1008 

1009 try: 

1010 # Initialize validation service 

1011 validation_service = get_validation_service() 

1012 

1013 # Register with depends 

1014 depends.set("fastblocks_validation", validation_service) 

1015 

1016 return validation_service.available 

1017 

1018 except Exception: 

1019 return False 

1020 

1021 

1022__all__ = [ 

1023 "FastBlocksValidationService", 

1024 "ValidationConfig", 

1025 "ValidationType", 

1026 "get_validation_service", 

1027 "validate_template_context", 

1028 "validate_form_input", 

1029 "validate_api_contract", 

1030 "register_fastblocks_validation", 

1031 "acb_validation_available", 

1032]