Source code for mindroot.coreplugins.user_service.role_service
import os
import json
from typing import List
from lib.providers.services import service
from .models import UserAuth
[docs]
@service()
async def has_role(username: str, role: str, user_data_root: str) -> bool:
"""Check if user has specified role"""
auth_file = os.path.join(user_data_root, username, "auth.json")
if not os.path.exists(auth_file):
return False
with open(auth_file, 'r') as f:
try:
auth_data = UserAuth(**json.load(f))
return role in auth_data.roles
except:
return False
[docs]
@service()
async def add_role(username: str, role: str, user_data_root: str) -> bool:
"""Add a role to a user. Should be called only from admin context."""
auth_file = os.path.join(user_data_root, username, "auth.json")
if not os.path.exists(auth_file):
return False
with open(auth_file, 'r') as f:
auth_data = UserAuth(**json.load(f))
# Add the role if it doesn't exist
if role not in auth_data.roles:
auth_data.roles.add(role)
with open(auth_file, 'w') as f:
json.dump(auth_data.dict(), f, indent=2, default=str)
return True
[docs]
@service()
async def remove_role(username: str, role: str, user_data_root: str) -> bool:
"""Remove a role from a user. Should be called only from admin context."""
if role == "user":
raise ValueError("Cannot remove 'user' role")
auth_file = os.path.join(user_data_root, username, "auth.json")
if not os.path.exists(auth_file):
return False
with open(auth_file, 'r') as f:
auth_data = UserAuth(**json.load(f))
# Remove the role if it exists
if role in auth_data.roles:
auth_data.roles.remove(role)
with open(auth_file, 'w') as f:
json.dump(auth_data.dict(), f, indent=2, default=str)
return True
[docs]
@service()
async def get_user_roles(username: str, user_data_root: str) -> List[str]:
"""Get all roles for a user"""
auth_file = os.path.join(user_data_root, username, "auth.json")
if not os.path.exists(auth_file):
return set()
with open(auth_file, 'r') as f:
try:
auth_data = UserAuth(**json.load(f))
return auth_data.roles
except:
return ["user"]