Source code for mindroot.lib.auth.auth
from fastapi import Request, HTTPException, Depends
from fastapi.responses import RedirectResponse
from typing import Optional, Union, Dict, Any
import logging
logger = logging.getLogger(__name__)
[docs]
async def get_current_user(request: Request) -> Optional[Any]:
"""
Get the current authenticated user from the request state.
Args:
request: The FastAPI request object
Returns:
The user object if authenticated, None otherwise
"""
if hasattr(request.state, "user"):
return request.state.user
return None
[docs]
async def require_user(request: Request, redirect_to_login: bool = False) -> Any:
"""
Dependency to require an authenticated user for a route.
Args:
request: The FastAPI request object
redirect_to_login: If True, redirects to /login when not authenticated
If False, raises an HTTPException with 401 status
Returns:
The user object if authenticated
Raises:
HTTPException: If the user is not authenticated (when redirect_to_login is False)
RedirectResponse: If the user is not authenticated (when redirect_to_login is True)
"""
user = await get_current_user(request)
if user is None:
logger.warning(f"Unauthorized access attempt to {request.url.path}")
if redirect_to_login:
return RedirectResponse(
url=f"/login?next={request.url.path}",
status_code=302
)
else:
raise HTTPException(
status_code=401,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"}
)
return user
[docs]
async def require_admin(request: Request, redirect_to_login: bool = False) -> Any:
"""
Dependency to require an authenticated admin user for a route.
Args:
request: The FastAPI request object
redirect_to_login: If True, redirects to /login when not authenticated
If False, raises an HTTPException with 401/403 status
Returns:
The user object if authenticated and has admin role
Raises:
HTTPException: If the user is not authenticated or lacks admin privileges
RedirectResponse: If the user is not authenticated (when redirect_to_login is True)
"""
user = await require_user(request, redirect_to_login)
# Check if user is already a RedirectResponse (from require_user)
if isinstance(user, RedirectResponse):
return user
if not hasattr(user, "roles") or "admin" not in user.roles:
logger.warning(f"Unauthorized admin access attempt by {user.username} to {request.url.path}")
if redirect_to_login:
return RedirectResponse(
url=f"/login?next={request.url.path}",
status_code=302
)
else:
raise HTTPException(
status_code=403,
detail="Not authorized. Admin privileges required."
)
return user