Source code for mindroot.coreplugins.admin.server_router

from fastapi import APIRouter, Response
import sys
import os
import psutil
import subprocess
import asyncio
import tempfile
router = APIRouter()

[docs] def get_start_method(): try: current_process = psutil.Process() cmdline = current_process.cmdline() cmdline_str = ' '.join(cmdline).lower() if 'mindroot' in cmdline_str: if 'PM2_HOME' in os.environ or any(('pm2' in p.name().lower() for p in current_process.parents())): return 'pm2' return 'mindroot' return 'unknown' except Exception as e: return 'unknown'
[docs] def spawn_restart(): try: current_process = psutil.Process() original_cmd = current_process.cmdline() with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: f.write(f'\n#!/usr/bin/env python3\n# Temporary restart script for xingen\nimport time\nimport subprocess\nimport sys\nimport os\n\ntime.sleep(2.5) # Wait for old server to fully shutdown\n\ntry:\n cmd = {original_cmd!r}\n print("Restarting mindroot with command:", cmd)\n subprocess.run(cmd, check=True)\nexcept Exception as e:\n print(f"Error restarting mindroot: {{e}}")\n sys.exit(1)\nfinally:\n # Clean up this temporary script\n try:\n os.unlink(__file__)\n except:\n pass\n') script_path = f.name os.chmod(script_path, 493) subprocess.Popen([sys.executable, script_path], start_new_session=True, stdout=None, stderr=None) return True except Exception as e: return False
[docs] async def delayed_exit(): await asyncio.sleep(0.5) sys.exit(0)
[docs] @router.post('/restart') async def restart_server(): try: method = get_start_method() if method == 'pm2': message = 'Server stopping - PM2 will automatically restart it' elif method == 'mindroot': if spawn_restart(): message = 'Server stopping - restart process initiated' else: return {'success': False, 'message': 'Failed to initiate restart process', 'method': method} else: return {'success': False, 'message': 'Unknown start method - please restart server manually', 'method': method} asyncio.create_task(delayed_exit()) return {'success': True, 'message': message, 'method': method} except Exception as e: return {'success': False, 'message': f'Restart failed: {str(e)}', 'method': method if 'method' in locals() else 'unknown'}
[docs] @router.post('/stop') async def stop_server(): try: method = get_start_method() if method == 'pm2': message = 'Server stopping - PM2 will automatically restart it unless stopped through PM2' else: message = 'Server stopping - manual restart will be required' asyncio.create_task(delayed_exit()) return {'success': True, 'message': message} except Exception as e: return {'success': False, 'message': f'Stop failed: {str(e)}'}
[docs] @router.get('/ping') async def ping(): """Simple endpoint to check if server is running""" return {'status': 'ok'}