Coverage for src / fastapi_authly / core / security.py: 100%
31 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-20 11:54 +0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-20 11:54 +0800
1from datetime import datetime, timedelta, timezone
2from typing import Any, Dict, Optional
4from jose import JWTError, jwt
5from passlib.context import CryptContext
7from ..interfaces import PasswordHasher, TokenService
10class BcryptPasswordHasher(PasswordHasher):
11 """Default bcrypt-based password hasher."""
13 def __init__(self) -> None:
14 # pbkdf2_sha256 avoids platform-specific bcrypt backend issues
15 self._ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
17 def hash(self, password: str) -> str:
18 return self._ctx.hash(password)
20 def verify(self, password: str, hashed: str) -> bool:
21 return self._ctx.verify(password, hashed)
24class JWTTokenService(TokenService):
25 """Default JWT token service using python-jose."""
27 def __init__(
28 self,
29 *,
30 secret_key: str,
31 algorithm: str,
32 access_token_expire_minutes: int,
33 refresh_token_expire_days: int,
34 ) -> None:
35 self.secret_key = secret_key
36 self.algorithm = algorithm
37 self.access_token_expire = timedelta(minutes=access_token_expire_minutes)
38 self.refresh_token_expire = timedelta(days=refresh_token_expire_days)
40 def _create_token(self, subject: str | Any, expires_delta: timedelta, token_type: str) -> str:
41 expire = datetime.now() + expires_delta
42 to_encode: Dict[str, Any] = {
43 "exp": expire,
44 "sub": str(subject),
45 "type": token_type,
46 }
47 return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
49 def create_access_token(self, subject: str | Any) -> str:
50 return self._create_token(subject, self.access_token_expire, token_type="access")
52 def create_refresh_token(self, subject: str | Any) -> str:
53 return self._create_token(subject, self.refresh_token_expire, token_type="refresh")
55 def decode_token(self, token: str, verify_type: Optional[str] = None) -> Dict[str, Any]:
56 payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
57 if verify_type and payload.get("type") != verify_type:
58 raise JWTError("Invalid token type")
59 return payload