Source code for mindroot.coreplugins.index.handlers.agent_ops
import json
from datetime import datetime
from pathlib import Path
from fastapi.responses import JSONResponse
from fastapi import HTTPException
from ..models import AgentEntry
from ..utils import load_agent_data
[docs]
async def add_agent(INDEX_DIR: Path, index_name: str, agent: AgentEntry):
"""Add an agent to an index"""
try:
index_dir = INDEX_DIR / index_name
index_file = index_dir / 'index.json'
if not index_file.exists():
return JSONResponse({'success': False, 'message': 'Index not found'})
try:
agent_data = await load_agent_data(agent.name)
except FileNotFoundError as e:
return JSONResponse({'success': False, 'message': str(e)})
except ValueError as e:
return JSONResponse({'success': False, 'message': str(e)})
agent_data['added_at'] = datetime.now().isoformat()
with open(index_file, 'r') as f:
index_data = json.load(f)
if any(a['name'] == agent.name for a in index_data['agents']):
return JSONResponse({'success': False, 'message': 'Agent already in index'})
# If agent has a persona, copy persona files
if 'persona' in agent_data and isinstance(agent_data['persona'], dict):
persona_name = agent_data['persona'].get('name')
if persona_name:
persona_source = Path('personas/local') / persona_name
if not persona_source.exists():
persona_source = Path('personas/shared') / persona_name
if persona_source.exists():
persona_target = index_dir / 'personas' / persona_name
if persona_target.exists():
shutil.rmtree(persona_target)
persona_target.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(persona_source, persona_target)
index_data['agents'].append(agent_data)
with open(index_file, 'w') as f:
json.dump(index_data, f, indent=2)
return JSONResponse({'success': True, 'data': index_data})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
[docs]
async def remove_agent(INDEX_DIR: Path, index_name: str, agent_name: str):
"""Remove an agent from an index"""
try:
index_dir = INDEX_DIR / index_name
index_file = index_dir / 'index.json'
if not index_file.exists():
return JSONResponse({'success': False, 'message': 'Index not found'})
with open(index_file, 'r') as f:
index_data = json.load(f)
# Get agent data before removal to handle persona cleanup
agent_data = next((a for a in index_data['agents'] if a['name'] == agent_name), None)
if agent_data and 'persona' in agent_data:
persona_name = agent_data['persona'].get('name')
if persona_name:
persona_dir = index_dir / 'personas' / persona_name
if persona_dir.exists():
# Check if persona is used by other agents before removing
other_agents_with_persona = [a for a in index_data['agents']
if a['name'] != agent_name
and a.get('persona', {}).get('name') == persona_name]
if not other_agents_with_persona:
shutil.rmtree(persona_dir)
# Remove from index data
index_data['agents'] = [a for a in index_data['agents'] if a['name'] != agent_name]
with open(index_file, 'w') as f:
json.dump(index_data, f, indent=2)
return JSONResponse({'success': True, 'data': index_data})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))