Coverage for fastblocks / adapters / auth / basic.py: 8%

66 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:58 -0800

1"""Basic Authentication Adapter for FastBlocks. 

2 

3Provides HTTP Basic Authentication with session management for FastBlocks applications. 

4Includes secure credential validation, session middleware integration, and user model support. 

5 

6Features: 

7- HTTP Basic Authentication with base64 credential encoding 

8- Session-based authentication state management 

9- Configurable secret key for session security 

10- HTTPS-only cookies in production environments 

11- Integration with Starlette authentication middleware 

12- Custom user model support for extended user data 

13 

14Requirements: 

15- starlette>=0.47.1 

16- pydantic>=2.11.7 

17 

18Usage: 

19```python 

20from acb.depends import Inject, depends 

21from acb.adapters import import_adapter 

22 

23auth = depends.get("auth") 

24 

25Auth = import_adapter("auth") 

26 

27auth_middleware = await auth.init() 

28``` 

29 

30Author: lesleslie <les@wedgwoodwebworks.com> 

31Created: 2025-01-12 

32""" 

33 

34import base64 

35import binascii 

36import typing as t 

37from uuid import UUID 

38 

39from acb.adapters import AdapterStatus 

40from pydantic import UUID4, EmailStr, SecretStr 

41from starlette.authentication import AuthCredentials, AuthenticationError, SimpleUser 

42from starlette.middleware import Middleware 

43from starlette.middleware.sessions import SessionMiddleware 

44from starlette.requests import Request 

45from fastblocks.htmx import HtmxRequest 

46 

47from ._base import AuthBase, AuthBaseSettings 

48 

49 

50class AuthSettings(AuthBaseSettings): 

51 pass 

52 

53 

54class CurrentUser: 

55 def has_role(self, _: str) -> str: 

56 raise NotImplementedError 

57 

58 def set_role(self, _: str) -> str | bool | None: 

59 raise NotImplementedError 

60 

61 @property 

62 def identity(self) -> UUID4 | str | int: 

63 raise NotImplementedError 

64 

65 @property 

66 def display_name(self) -> str: 

67 raise NotImplementedError 

68 

69 @property 

70 def email(self) -> EmailStr | None: 

71 raise NotImplementedError 

72 

73 def is_authenticated( 

74 self, 

75 request: HtmxRequest | None = None, 

76 config: t.Any = None, 

77 ) -> bool | int | str: 

78 raise NotImplementedError 

79 

80 

81class Auth(AuthBase): 

82 secret_key: SecretStr 

83 

84 @staticmethod 

85 async def authenticate(request: HtmxRequest | Request) -> bool: 

86 headers = getattr(request, "headers", {}) 

87 if "Authorization" not in headers: 

88 return False 

89 auth = headers["Authorization"] 

90 try: 

91 scheme, credentials = auth.split() 

92 if scheme.lower() != "basic": 

93 return False 

94 decoded = base64.b64decode(credentials).decode("ascii") 

95 except (ValueError, UnicodeDecodeError, binascii.Error): 

96 msg = "Invalid basic auth credentials" 

97 raise AuthenticationError(msg) 

98 username, _, _ = decoded.partition(":") 

99 state = getattr(request, "state", None) 

100 if state: 

101 state.auth_credentials = ( 

102 AuthCredentials(["authenticated"]), 

103 SimpleUser(username), 

104 ) 

105 return True 

106 

107 def __init__( 

108 self, 

109 secret_key: SecretStr | None = None, 

110 user_model: t.Any | None = None, 

111 ) -> None: 

112 if secret_key is None: 

113 from acb.config import Config 

114 

115 config = Config() 

116 config.init() 

117 secret_key = config.get("auth.secret_key") 

118 if secret_key is None: 

119 secret_key = ( 

120 getattr(self.config.app, "secret_key", None) 

121 if hasattr(self, "config") 

122 else None 

123 ) 

124 if secret_key is None: 

125 raise ValueError( 

126 "secret_key must be provided either directly or via config" 

127 ) 

128 

129 super().__init__(secret_key, user_model) 

130 self.secret_key = secret_key 

131 if not self.secret_key: 

132 raise ValueError("secret_key must be provided via config or parameter") 

133 self.name = "basic" 

134 self.user_model = user_model 

135 

136 async def init(self) -> None: 

137 self.middlewares = [ 

138 Middleware( 

139 SessionMiddleware, 

140 secret_key=self.secret_key.get_secret_value(), 

141 session_cookie=f"{self.token_id}_admin", 

142 https_only=bool(self.config.deployed), 

143 ), 

144 ] 

145 

146 async def login(self, request: HtmxRequest) -> bool: 

147 raise NotImplementedError 

148 

149 async def logout(self, request: HtmxRequest) -> bool: 

150 raise NotImplementedError 

151 

152 

153MODULE_ID = UUID("01937d86-5f3b-7c4d-9e0f-2345678901bc") 

154MODULE_STATUS = AdapterStatus.STABLE