Source code for mindroot.coreplugins.user_service.admin_init

import os
import random
import string
from typing import Tuple, Optional
from pathlib import Path
import json
from datetime import datetime
from lib.providers.hooks import hook
from .models import UserAuth, UserCreate
from .role_service import has_role
from .mod import create_user
from lib.providers.services import service
from rich.console import Console
console = Console()
USER_DATA_ROOT = 'data/users'
created_admin = {}

[docs] @hook() async def startup(app, context): admin_user, admin_pass = await initialize_admin(USER_DATA_ROOT, app) if admin_user: created_admin['username'] = admin_user created_admin['password'] = admin_pass console.print(f"Created admin user: {created_admin['username']} password: {created_admin['password']}", style='yellow on dark_blue')
[docs] def generate_random_credentials(prefix: str='admin', length: int=8) -> Tuple[str, str]: """Generate random admin username and password.""" chars = string.ascii_letters + string.digits random_suffix = ''.join(random.choices(chars, k=length)) random_pass = ''.join(random.choices(chars + '!@#$%^&*', k=16)) return (f'{prefix}{random_suffix}', random_pass)
[docs] async def check_for_admin(user_data_root: str) -> bool: """Check if any admin user exists.""" if not os.path.exists(user_data_root): return False for username in os.listdir(user_data_root): auth_file = os.path.join(user_data_root, username, 'auth.json') if os.path.exists(auth_file): with open(auth_file, 'r') as f: try: auth_data = UserAuth(**json.load(f)) if 'admin' in auth_data.roles: return True except: continue return False
[docs] @service() async def initialize_admin(user_data_root: str, app) -> Tuple[Optional[str], Optional[str]]: """Check for and create admin user if needed. Returns tuple of (username, password) if created, (None, None) if admin exists. This should be called during system startup to ensure at least one admin exists. The admin user will have 'admin', 'verified', and 'user' roles. """ args = app.state.cmd_args username = None password = None admin_user = None admin_pass = None if args.admin_user: admin_user = args.admin_user admin_pass = args.admin_password if admin_user and admin_pass: username = admin_user password = admin_pass else: if await check_for_admin(user_data_root): return (None, None) env_user = os.environ.get('ADMIN_USER') env_pass = os.environ.get('ADMIN_PASS') username = env_user password = env_pass if not (username and password): username, password = generate_random_credentials() username = 'admin' user_data = UserCreate(username=username, password=password, email=os.environ.get('ADMIN_EMAIL', 'admin@admin.com')) try: await create_user(user_data, roles=['admin', 'verified'], skip_verification=True) except Exception as e: if 'exists' in str(e): pass else: raise e return (username, password)