from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
import traceback
import os
import json
from typing import List, Optional
from lib.plugins import load_plugin_manifest, update_plugin_manifest, plugin_install, save_plugin_manifest, plugin_update, toggle_plugin_state, get_plugin_path
from lib.plugins.installation import download_github_files
from lib.streamcmd import stream_command_as_events
import asyncio
import httpx
router = APIRouter()
[docs]
class DirectoryRequest(BaseModel):
directory: str
[docs]
class PluginRequest(BaseModel):
plugin: str
[docs]
class GitHubPluginRequest(BaseModel):
plugin: str
url: Optional[str] = None
github_url: Optional[str] = None
[docs]
class TogglePluginRequest(BaseModel):
plugin: str
enabled: bool
[docs]
class InstallFromIndexRequest(BaseModel):
plugin: str
index_name: str
[docs]
class StreamInstallRequest(BaseModel):
plugin: str
source: str
source_path: str = None
import sys, os, shlex
[docs]
@router.post('/stream-install-plugin', response_class=EventSourceResponse)
async def stream_install_plugin(request: StreamInstallRequest):
"""Stream the installation process of a plugin using SSE (POST method)."""
if request.source == 'github_direct' or request.source == 'github':
if request.source_path.startswith('https://'):
cmd = [sys.executable, '-m', 'pip', 'install', request.source_path, '-v', '--no-cache-dir']
else:
cmd = [sys.executable, '-m', 'pip', 'install', '-e', request.source_path, '-v', '--no-cache-dir']
elif request.source == 'local':
cmd = [sys.executable, '-m', 'pip', 'install', '-e', request.source_path, '-v', '--no-cache-dir']
elif request.source == 'pypi':
cmd = [sys.executable, '-m', 'pip', 'install', request.plugin, '-v', '--no-cache-dir']
else:
return {'success': False, 'message': 'Invalid source'}
if request.source == 'github':
try:
parts = request.source_path.split(':')
repo_path = parts[0]
tag = parts[1] if len(parts) > 1 else None
async def stream_github_install():
yield {'event': 'message', 'data': f'Downloading GitHub repository {repo_path}...'}
try:
plugin_dir, _, plugin_info = download_github_files(repo_path, tag)
cmd = [sys.executable, '-m', 'pip', 'install', '-e', plugin_dir, '-v', '--no-cache-dir']
async for event in stream_command_as_events(cmd):
yield event
update_plugin_manifest(plugin_info['name'], 'github', os.path.abspath(plugin_dir), remote_source=repo_path, version=plugin_info.get('version', '0.0.1'), metadata=plugin_info)
except Exception as e:
yield {'event': 'error', 'data': f'Error installing from GitHub: {str(e)}'}
return EventSourceResponse(stream_github_install())
except Exception as e:
return {'success': False, 'message': f'Error setting up GitHub installation: {str(e)}'}
return EventSourceResponse(stream_command_as_events(cmd))
[docs]
@router.get('/stream-install-plugin', response_class=EventSourceResponse)
async def stream_install_plugin_get(request: Request):
"""Stream the installation process of a plugin using SSE (GET method)."""
plugin = request.query_params.get('plugin', '')
source = request.query_params.get('source', '')
source_path = request.query_params.get('source_path', '')
if source == 'github':
cmd = [sys.executable, '-m', 'pip', 'install', '-e', source_path, '-v', '--no-cache-dir']
message = f'Installing {plugin} from GitHub repository {source_path}...'
elif source == 'local':
cmd = [sys.executable, '-m', 'pip', 'install', '-e', source_path, '-v', '--no-cache-dir']
message = f'Installing from local path: {source_path}'
elif source == 'pypi':
cmd = [sys.executable, '-m', 'pip', 'install', plugin, '-v', '--no-cache-dir']
message = f'Installing from PyPI: {plugin}'
else:
return {'success': False, 'message': 'Invalid source'}
tag = None
if source == 'github':
try:
parts = source_path.split(':')
repo_path = parts[0]
tag = parts[1] if len(parts) > 1 else None
async def stream_github_install():
yield {'event': 'message', 'data': f'Downloading GitHub repository {repo_path}...'}
repo_path_ = repo_path
tag_ = tag
try:
if source_path.startswith('https://'):
repo_path_ = source_path
tag_ = None
parts = repo_path_.split('/')
if len(parts) >= 5:
repo_path_ = f'{parts[3]}/{parts[4]}'
plugin_dir, _, plugin_info = download_github_files(repo_path_, tag_)
cmd = [sys.executable, '-m', 'pip', 'install', '-e', plugin_dir, '-v', '--no-cache-dir']
async for event in stream_command_as_events(cmd):
yield event
update_plugin_manifest(plugin_info['name'], 'github', os.path.abspath(plugin_dir), remote_source=repo_path_, version=plugin_info.get('version', '0.0.1'), metadata=plugin_info)
except Exception as e:
trace = traceback.format_exc()
yield {'event': 'error', 'data': f'Error installing from GitHub: {str(e)} \n\n{trace}'}
return EventSourceResponse(stream_github_install())
except Exception as e:
trace = traceback.format_exc()
return {'success': False, 'message': f'Error installing from GitHub: {str(e)}\n\n{trace}'}
return EventSourceResponse(stream_command_as_events(cmd))
[docs]
@router.get('/get-all-plugins')
async def get_all_plugins():
try:
manifest = load_plugin_manifest()
plugins = []
for plugin_name, plugin_info in manifest['plugins']['core'].items():
plugins.append({'name': plugin_name, 'category': 'core', 'enabled': plugin_info['enabled'], 'source': 'core', 'remote_source': plugin_name, 'version': '1.0.0', 'description': plugin_info.get('metadata', {}).get('description', '')})
for plugin_name, plugin_info in manifest['plugins']['installed'].items():
plugins.append({'name': plugin_name, 'category': 'installed', 'enabled': plugin_info['enabled'], 'source': plugin_info['source'], 'remote_source': plugin_info.get('remote_source', plugin_info.get('github_url')), 'source_path': plugin_info.get('source_path'), 'version': plugin_info.get('version', '0.0.1'), 'description': plugin_info.get('metadata', {}).get('description', ''), 'index_source': plugin_info.get('metadata', {}).get('index_source')})
return {'success': True, 'data': plugins}
except Exception as e:
trace = traceback.format_exc()
return {'success': False, 'message': f'Error fetching plugins: {str(e)}\n\n{trace}'}
[docs]
@router.post('/scan-directory')
async def scan_directory(request: DirectoryRequest):
try:
directory = request.directory
if not os.path.isdir(directory):
return {'success': False, 'message': 'Invalid directory path'}
discovered_plugins = discover_plugins(directory)
manifest = load_plugin_manifest()
addable_count = 0
for plugin_name, plugin_info in discovered_plugins.items():
has_github = plugin_info.get('github_url') or plugin_info.get('remote_source') or plugin_info.get('metadata', {}).get('github_url')
if has_github:
addable_count += 1
for plugin_name, plugin_info in discovered_plugins.items():
plugin_info['source'] = 'local'
plugin_info['metadata'] = plugin_info.get('metadata', {}) or {'description': plugin_info.get('description', ''), 'install_date': plugin_info.get('install_date', ''), 'commands': plugin_info.get('commands', []), 'services': plugin_info.get('services', [])}
manifest['plugins']['installed'][plugin_name] = plugin_info
plugins_list = [{'name': name, 'description': info.get('metadata', {}).get('description', info.get('description', ''))} for name, info in discovered_plugins.items()]
save_plugin_manifest(manifest)
response = {'success': True, 'message': f'Scanned {len(discovered_plugins)} plugins in {directory}', 'plugins': plugins_list, 'addable_to_index': addable_count}
if addable_count < len(discovered_plugins):
response['warning'] = f'{len(discovered_plugins) - addable_count} plugins missing GitHub info and cannot be added to indices'
return response
except Exception as e:
trace = traceback.format_exc()
return {'success': False, 'message': f'Error during scan: {str(e)}\n\n{trace}'}
[docs]
@router.post('/install-local-plugin')
async def install_local_plugin(request: PluginRequest):
try:
plugin_name = request.plugin
plugin_path = get_plugin_path(plugin_name)
if not plugin_path:
return {'success': False, 'message': 'Plugin path not found'}
success = await plugin_install(plugin_name, source='local', source_path=plugin_path)
if success:
return {'success': True, 'message': f'Plugin {plugin_name} installed successfully'}
else:
return {'success': False, 'message': 'Installation failed'}
except Exception as e:
trace = traceback.format_exc()
return {'success': False, 'message': f'Error installing plugin: {str(e)}\n\n{trace}'}
[docs]
@router.post('/install-x-github-plugin')
async def install_github_plugin(request: GitHubPluginRequest):
try:
url = request.url or request.github_url
success = await plugin_install('test', source='github', source_path=url)
if success:
return {'success': True, 'message': 'Plugin installed successfully from GitHub'}
else:
return {'success': False, 'message': 'Installation failed'}
except Exception as e:
trace = traceback.format_exc()
return {'success': False, 'message': f'Error installing from GitHub: {str(e)}\n\n{trace}'}
[docs]
@router.post('/install-from-index')
async def install_from_index(request: InstallFromIndexRequest):
try:
index_path = os.path.join('indices', f'{request.index_name}.json')
if not os.path.exists(index_path):
return {'success': False, 'message': 'Index not found'}
with open(index_path, 'r') as f:
index_data = json.load(f)
plugin_data = None
for plugin in index_data.get('plugins', []):
if plugin['name'] == request.plugin:
plugin_data = plugin
break
if not plugin_data:
return {'success': False, 'message': 'Plugin not found in index'}
if plugin_data.get('github_url'):
success = await plugin_install(request.plugin, source='github', source_path=plugin_data['github_url'])
elif plugin_data.get('source_path'):
success = await plugin_install(request.plugin, source='local', source_path=plugin_data['source_path'])
else:
return {'success': False, 'message': 'No valid installation source in index'}
if success:
manifest = load_plugin_manifest()
if request.plugin in manifest['plugins']['installed']:
manifest['plugins']['installed'][request.plugin]['metadata']['index_source'] = request.index_name
save_plugin_manifest(manifest)
return {'success': True, 'message': f'Plugin {request.plugin} installed successfully from index'}
else:
return {'success': False, 'message': 'Installation failed'}
except Exception as e:
trace = traceback.format_exc()
return {'success': False, 'message': f'Error installing from index: {str(e)}\n\n{trace}'}
[docs]
@router.post('/toggle-plugin')
async def toggle_plugin(request: TogglePluginRequest):
try:
success = toggle_plugin_state(request.plugin, request.enabled)
if success:
return {'success': True, 'message': f"Plugin {request.plugin} {('enabled' if request.enabled else 'disabled')} successfully"}
else:
return {'success': False, 'message': 'Failed to toggle plugin state'}
except Exception as e:
trace = traceback.format_exc()
return {'success': False, 'message': f'Error toggling plugin: {str(e)}\n\n{trace}'}
[docs]
def discover_plugins(directory):
discovered = {}
for item in os.listdir(directory):
item_path = os.path.join(directory, item)
plugin_info_path = os.path.join(item_path, 'plugin_info.json')
if os.path.isfile(plugin_info_path):
try:
with open(plugin_info_path, 'r') as f:
plugin_info = json.load(f)
plugin_info['enabled'] = False
plugin_info['source_path'] = item_path
discovered[plugin_info['name']] = plugin_info
except json.JSONDecodeError:
continue
return discovered
[docs]
async def publish_plugin_from_github(repo: str, registry_token: str, registry_url: str):
"""
Fetches plugin_info.json from a GitHub repo and publishes it to the registry.
"""
plugin_info = None
for branch in ['main', 'master']:
url = f'https://raw.githubusercontent.com/{repo}/{branch}/plugin_info.json'
async with httpx.AsyncClient() as client:
try:
response = await client.get(url)
if response.status_code == 200:
plugin_info = response.json()
break
except httpx.RequestError as e:
continue
if not plugin_info:
raise Exception(f"Could not find or access plugin_info.json in repo {repo} on 'main' or 'master' branch.")
publish_data = {'title': plugin_info.get('name'), 'description': plugin_info.get('description', ''), 'category': 'plugin', 'content_type': 'mindroot_plugin', 'version': plugin_info.get('version', '0.1.0'), 'github_url': f'https://github.com/{repo}', 'pypi_module': plugin_info.get('pypi_module'), 'commands': plugin_info.get('commands', []), 'services': plugin_info.get('services', []), 'tags': plugin_info.get('tags', ['plugin']), 'dependencies': plugin_info.get('dependencies', []), 'data': {'plugin_info': plugin_info, 'installation': {'type': 'github', 'source_path': repo}}}
publish_url = f'{registry_url}/publish'
headers = {'Authorization': f'Bearer {registry_token}', 'Content-Type': 'application/json'}
async with httpx.AsyncClient() as client:
response = await client.post(publish_url, json=publish_data, headers=headers)
if response.status_code >= 400:
try:
error_detail = response.json().get('detail', response.text)
except:
error_detail = response.text
raise Exception(f'Failed to publish to registry: {response.status_code} - {error_detail}')
return response.json()