Source code for mindroot.coreplugins.user_service.password_reset_service
from typing import Optional
from lib.providers.services import service
from .models import UserAuth, PasswordResetToken
import bcrypt
import json
import os
import secrets
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
USER_DATA_ROOT = "data/users"
RESET_TOKEN_VALIDITY_HOURS = 1
[docs]
@service()
async def initiate_password_reset(username: str, is_admin_reset: bool = False, token: Optional[str] = None, context=None) -> str:
"""Initiates a password reset, generates a token, and stores it."""
# Log current working directory and absolute paths
cwd = os.getcwd()
abs_user_data_root = os.path.abspath(USER_DATA_ROOT)
user_dir = os.path.join(USER_DATA_ROOT, username)
abs_user_dir = os.path.abspath(user_dir)
logger.info(f"=== PASSWORD RESET INITIATION ===")
logger.info(f"Current working directory: {cwd}")
logger.info(f"USER_DATA_ROOT (relative): {USER_DATA_ROOT}")
logger.info(f"USER_DATA_ROOT (absolute): {abs_user_data_root}")
logger.info(f"User directory (relative): {user_dir}")
logger.info(f"User directory (absolute): {abs_user_dir}")
logger.info(f"Initiating password reset for user: {username}")
if not os.path.exists(user_dir):
logger.error(f"User directory not found: {user_dir} (absolute: {abs_user_dir})")
logger.error(f"Directory exists check: {os.path.exists(abs_user_dir)}")
raise ValueError("User not found")
# List existing files in user directory
try:
existing_files = os.listdir(user_dir)
logger.info(f"Existing files in user directory: {existing_files}")
except Exception as e:
logger.error(f"Error listing user directory: {e}")
if token is None:
token = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(hours=RESET_TOKEN_VALIDITY_HOURS)
logger.info(f"Generated reset token for {username}, expires at: {expires_at.isoformat()}")
reset_data = PasswordResetToken(
token=token,
expires_at=expires_at.isoformat(),
is_admin_reset=is_admin_reset
)
reset_file_path = os.path.join(user_dir, "password_reset.json")
abs_reset_file_path = os.path.abspath(reset_file_path)
logger.info(f"Saving reset token to: {reset_file_path} (absolute: {abs_reset_file_path})")
with open(reset_file_path, 'w') as f:
json.dump(reset_data.dict(), f, indent=2)
logger.info(f"Reset token file created successfully")
# Verify file was created
if os.path.exists(reset_file_path):
logger.info(f"Verified: Reset token file exists after creation")
try:
with open(reset_file_path, 'r') as f:
verify_data = json.load(f)
logger.info(f"Verified: Reset token file contents: {verify_data}")
except Exception as e:
logger.error(f"Error reading back reset token file: {e}")
else:
logger.error(f"ERROR: Reset token file was not created successfully!")
return token
[docs]
@service()
async def reset_password_with_token(token: str, new_password: str, context=None) -> bool:
"""Resets a user's password using a valid reset token."""
# Log current working directory and absolute paths
cwd = os.getcwd()
abs_user_data_root = os.path.abspath(USER_DATA_ROOT)
logger.info(f"=== PASSWORD RESET ATTEMPT ===")
logger.info(f"Current working directory: {cwd}")
logger.info(f"USER_DATA_ROOT (relative): {USER_DATA_ROOT}")
logger.info(f"USER_DATA_ROOT (absolute): {abs_user_data_root}")
logger.info(f"Attempting password reset with token: {token[:10]}...{token[-10:]}")
logger.info(f"Token length: {len(token)}")
if not os.path.exists(USER_DATA_ROOT):
logger.error(f"USER_DATA_ROOT directory does not exist:")
logger.error(f" Relative path: {USER_DATA_ROOT}")
logger.error(f" Absolute path: {abs_user_data_root}")
logger.error(f" Exists check: {os.path.exists(abs_user_data_root)}")
raise ValueError("User data directory not found")
users_found = 0
tokens_checked = 0
try:
user_dirs = os.listdir(USER_DATA_ROOT)
logger.info(f"Found {len(user_dirs)} potential user directories: {user_dirs}")
except Exception as e:
logger.error(f"Error listing USER_DATA_ROOT: {e}")
raise ValueError("Error accessing user data")
for username in user_dirs:
user_dir = os.path.join(USER_DATA_ROOT, username)
abs_user_dir = os.path.abspath(user_dir)
logger.info(f"Checking user directory: {user_dir} (absolute: {abs_user_dir})")
if not os.path.isdir(user_dir):
logger.debug(f"Skipping {username} - not a directory")
continue
users_found += 1
# List all files in this user directory
try:
user_files = os.listdir(user_dir)
logger.info(f"Files in user '{username}' directory: {user_files}")
except Exception as e:
logger.error(f"Error listing files in user directory {username}: {e}")
continue
reset_file_path = os.path.join(user_dir, "password_reset.json")
abs_reset_file_path = os.path.abspath(reset_file_path)
logger.info(f"Looking for reset file: {reset_file_path} (absolute: {abs_reset_file_path})")
logger.info(f"Reset file exists: {os.path.exists(reset_file_path)}")
if not os.path.exists(reset_file_path):
logger.info(f"No reset file found for user {username} - skipping")
continue
logger.info(f"Found reset file for user {username}, reading token data")
try:
with open(reset_file_path, 'r') as f:
reset_data_dict = json.load(f)
logger.info(f"Reset file contents for {username}: {reset_data_dict}")
reset_data = PasswordResetToken(**reset_data_dict)
except (json.JSONDecodeError, TypeError, ValueError) as e:
logger.error(f"Error parsing reset file for {username}: {e}")
continue
except Exception as e:
logger.error(f"Unexpected error reading reset file for {username}: {e}")
continue
tokens_checked += 1
stored_token = reset_data.token
logger.info(f"Comparing tokens for user {username}:")
logger.info(f" Provided token: '{token[:10]}...{token[-10:]}' (length: {len(token)})")
logger.info(f" Stored token: '{stored_token[:10]}...{stored_token[-10:]}' (length: {len(stored_token)})")
logger.info(f" Tokens match: {stored_token == token}")
# Also check for URL encoding issues
import urllib.parse
decoded_token = urllib.parse.unquote(token)
logger.info(f" URL decoded token: '{decoded_token[:10]}...{decoded_token[-10:]}' (length: {len(decoded_token)})")
logger.info(f" Decoded tokens match: {stored_token == decoded_token}")
if reset_data.token == token or reset_data.token == decoded_token:
logger.info(f"Token match found for user {username}!")
matched_token = token if reset_data.token == token else decoded_token
logger.info(f"Matched using {'original' if matched_token == token else 'URL-decoded'} token")
# Check expiration
try:
expires_at = datetime.fromisoformat(reset_data.expires_at)
current_time = datetime.utcnow()
logger.info(f"Token expires at: {expires_at}")
logger.info(f"Current time: {current_time}")
logger.info(f"Token expired: {expires_at < current_time}")
if expires_at < current_time:
logger.warning(f"Token expired for user {username}, removing reset file")
os.remove(reset_file_path)
raise ValueError("Password reset token has expired.")
except ValueError as e:
if "expired" in str(e):
raise
logger.error(f"Error parsing expiration date for {username}: {e}")
continue
# Update password
auth_file_path = os.path.join(user_dir, "auth.json")
abs_auth_file_path = os.path.abspath(auth_file_path)
logger.info(f"Updating password for user {username}")
logger.info(f"Auth file: {auth_file_path} (absolute: {abs_auth_file_path})")
if not os.path.exists(auth_file_path):
logger.error(f"Auth file not found for user {username}: {auth_file_path}")
os.remove(reset_file_path)
raise FileNotFoundError("User auth file not found.")
try:
with open(auth_file_path, 'r+') as auth_file:
auth_data_dict = json.load(auth_file)
auth_data = UserAuth(**auth_data_dict)
new_password_hash = bcrypt.hashpw(new_password.encode(), bcrypt.gensalt()).decode()
auth_data.password_hash = new_password_hash
logger.info(f"Password hash updated for user {username}")
if reset_data.is_admin_reset and 'admin' not in auth_data.roles:
auth_data.roles.append('admin')
logger.info(f"Added admin role to user {username}")
auth_file.seek(0)
json.dump(auth_data.dict(), auth_file, indent=2, default=str)
auth_file.truncate()
logger.info(f"Auth file updated for user {username}")
os.remove(reset_file_path)
logger.info(f"Reset token file removed for user {username}")
logger.info(f"Password reset completed successfully for user {username}")
return True
except Exception as e:
logger.error(f"Error updating auth file for user {username}: {e}")
raise ValueError(f"Error updating user authentication: {e}")
logger.warning(f"=== TOKEN NOT FOUND ===")
logger.warning(f"Token not found after checking {users_found} users and {tokens_checked} tokens")
logger.warning(f"Provided token: '{token}'")
logger.warning(f"Current working directory: {cwd}")
logger.warning(f"Searched in: {abs_user_data_root}")
raise ValueError("Invalid password reset token.")