Source code for mindroot.coreplugins.index.handlers.index_ops
import json
from datetime import datetime
from pathlib import Path
from fastapi.responses import JSONResponse
from fastapi import HTTPException
from ..models import IndexMetadata
from ..utils import ensure_index_structure
from lib.plugins import load_plugin_manifest
import shutil
[docs]
async def list_indices(INDEX_DIR: Path):
"""List all available indices"""
try:
indices = []
# Check if index directory exists and has content
if not INDEX_DIR.exists() or not any(INDEX_DIR.glob('*')):
# If INDEX_DIR exists but is empty, remove it first
if INDEX_DIR.exists():
shutil.rmtree(INDEX_DIR)
# Copy default indices
this_script_path = Path(__file__).parent.parent
default_indices_path = this_script_path / 'indices'
shutil.copytree(default_indices_path, INDEX_DIR)
# List all index directories
for index_dir in INDEX_DIR.iterdir():
if index_dir.is_dir():
index_file = index_dir / 'index.json'
if index_file.exists():
try:
with open(index_file, 'r') as f:
index_data = json.load(f)
manifest = load_plugin_manifest()
index_data['installed'] = index_dir.name in manifest.get('indices', {}).get('installed', {})
indices.append(index_data)
except json.JSONDecodeError:
continue # Skip invalid JSON files
return JSONResponse({'success': True, 'data': indices})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
[docs]
async def create_index(INDEX_DIR: Path, metadata: IndexMetadata):
"""Create a new index directory with metadata"""
try:
index_dir = INDEX_DIR / metadata.name
if index_dir.exists():
return JSONResponse({'success': False, 'message': 'Index already exists'})
# Create index directory structure
index_dir.mkdir(parents=True)
ensure_index_structure(index_dir)
index_data = {
'name': metadata.name,
'description': metadata.description,
'version': metadata.version,
'url': metadata.url,
'trusted': metadata.trusted,
'created_at': datetime.now().isoformat(),
'plugins': [],
'agents': []
}
with open(index_dir / 'index.json', 'w') as f:
json.dump(index_data, f, indent=2)
return JSONResponse({'success': True, 'data': index_data})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
[docs]
async def update_index(INDEX_DIR: Path, index_name: str, metadata: IndexMetadata):
"""Update index metadata"""
try:
index_dir = INDEX_DIR / index_name
index_file = index_dir / 'index.json'
if not index_file.exists():
return JSONResponse({'success': False, 'message': 'Index not found'})
with open(index_file, 'r') as f:
index_data = json.load(f)
index_data.update({
'name': metadata.name,
'description': metadata.description,
'version': metadata.version,
'url': metadata.url,
'trusted': metadata.trusted,
'updated_at': datetime.now().isoformat()
})
with open(index_file, 'w') as f:
json.dump(index_data, f, indent=2)
# If name changed, rename the directory
if metadata.name != index_name:
new_index_dir = INDEX_DIR / metadata.name
index_dir.rename(new_index_dir)
return JSONResponse({'success': True, 'data': index_data})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))