Source code for mindroot.coreplugins.index.handlers.publish
import os
import json
import shutil
import zipfile
from datetime import datetime
from pathlib import Path
from fastapi import HTTPException, UploadFile
from fastapi.responses import JSONResponse
from ..utils import install_persona
[docs]
async def publish_index(INDEX_DIR: Path, PUBLISHED_DIR: Path, index_name: str):
"""Publish an index by creating a zip file containing index.json and persona directories."""
try:
index_dir = INDEX_DIR / index_name
if not index_dir.exists() or not index_dir.is_dir():
raise HTTPException(status_code=404, detail="Index directory not found")
# Create a timestamp for the zip file name
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
zip_filename = f"{index_name}-{timestamp}.zip"
zip_path = PUBLISHED_DIR / zip_filename
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Add index.json
index_json_path = index_dir / 'index.json'
if index_json_path.exists():
zipf.write(index_json_path, 'index.json')
# Add persona directories if they exist
personas_dir = index_dir / 'personas'
if personas_dir.exists():
for persona_dir in personas_dir.iterdir():
if persona_dir.is_dir():
# Add all files from the persona directory
for root, _, files in os.walk(persona_dir):
for file in files:
file_path = Path(root) / file
# Create relative path for the zip file
arc_name = f"personas/{persona_dir.name}/{file}"
zipf.write(file_path, arc_name)
# Return URL path that can be used with the static file handler
zip_url = f"/published/{zip_filename}"
return JSONResponse({
'success': True,
'message': 'Index published successfully',
'zip_file': zip_url
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to publish index: {str(e)}")
[docs]
async def install_index_from_zip(INDEX_DIR: Path, file: UploadFile):
"""Install an index from a zip file."""
if not file.filename.endswith('.zip'):
raise HTTPException(status_code=400, detail="File must be a zip archive")
# Create temporary directory for processing
temp_dir = Path("temp_index_install")
temp_dir.mkdir(exist_ok=True)
try:
# Save uploaded file
zip_path = temp_dir / file.filename
with open(zip_path, 'wb') as f:
shutil.copyfileobj(file.file, f)
# Extract zip contents
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Look for index.json in the extracted contents
index_files = list(temp_dir.rglob('index.json'))
if not index_files:
raise HTTPException(status_code=400, detail="No index.json found in zip file")
# Read the index metadata
with open(index_files[0], 'r') as f:
index_data = json.load(f)
index_name = index_data.get('name')
if not index_name:
raise HTTPException(status_code=400, detail="Invalid index.json: missing name field")
# Process agents and their personas
index_root = index_files[0].parent
personas_dir = index_root / 'personas'
if personas_dir.exists():
for persona_dir in personas_dir.iterdir():
if persona_dir.is_dir():
persona_name = persona_dir.name
# Install persona to the correct location
install_persona(persona_dir, persona_name)
# Move index to final location
target_dir = INDEX_DIR / index_name
if target_dir.exists():
shutil.rmtree(target_dir)
# Create new index directory with index.json
target_dir.mkdir(parents=True)
shutil.copy2(index_files[0], target_dir / 'index.json')
# Copy personas directory if it exists
if personas_dir.exists():
shutil.copytree(personas_dir, target_dir / 'personas', dirs_exist_ok=True)
return JSONResponse({
'success': True,
'message': f"Index '{index_name}' installed successfully"
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to install index: {str(e)}")
finally:
# Clean up temporary directory
if temp_dir.exists():
shutil.rmtree(temp_dir)