Source code for mindroot.coreplugins.chat.router

from fastapi import APIRouter, HTTPException, Request, Response, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi import File, UploadFile, Form
from sse_starlette.sse import EventSourceResponse
from .models import MessageParts
from lib.providers.services import service, service_manager
from .services import init_chat_session, send_message_to_agent, subscribe_to_agent_messages, get_chat_history, run_task
from lib.templates import render
from lib.auth.auth import require_user
from lib.plugins import list_enabled
import nanoid
from lib.providers.commands import *
import asyncio
from lib.chatcontext import get_context, ChatContext
from typing import List
from lib.providers.services import service, service_manager
from lib.providers.commands import command_manager
from lib.utils.debug import debug_box
from lib.session_files import load_session_data, save_session_data
import os
import json
from lib.chatcontext import ChatContext
import shutil
from pydantic import BaseModel
from lib.auth.api_key import verify_api_key
from lib.providers.commands import command_manager

router = APIRouter()

# Global dictionary to store tasks
tasks = {}

[docs] class CommandRequest(BaseModel): command: str args: dict = {}
[docs] @router.post("/cmd/{log_id}") async def execute_command_with_session(request: Request, log_id: str, cmd_request: CommandRequest): """ Execute a command using an existing session's context. Parameters: - log_id: The session log_id to use for context - command: The command name to execute - args: Dictionary of arguments to pass to the command Returns: - JSON with command result or error """ # Handle authentication auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): api_key = auth_header[7:] try: user_data = await verify_api_key(api_key) if not user_data: raise HTTPException(status_code=401, detail="Invalid API key") username = user_data['username'] except HTTPException: raise except Exception as e: raise HTTPException(status_code=401, detail="Invalid API key") else: if not hasattr(request.state, "user"): raise HTTPException(status_code=401, detail="Authentication required") username = request.state.user.username # Get the context for this session try: context = await get_context(log_id, username) except Exception as e: raise HTTPException(status_code=404, detail=f"Session not found: {str(e)}") # Check if command exists if cmd_request.command not in command_manager.functions: raise HTTPException(status_code=404, detail=f"Command '{cmd_request.command}' not found") # Execute the command try: # Add context to args args = cmd_request.args.copy() args['context'] = context result = await command_manager.execute(cmd_request.command, **args) return {"status": "ok", "result": result} except Exception as e: import traceback traceback.print_exc() return {"status": "error", "message": str(e)}
[docs] @router.get("/cmd/{log_id}") async def execute_command_get(request: Request, log_id: str, command: str = Query(...), args: str = Query("{}")): """ Execute a command using GET request (useful for data-table retrieve-url). Parameters: - log_id: The session log_id to use for context - command: The command name to execute (query param) - args: JSON-encoded arguments (query param, default: {}) Returns: - Direct command result (for use with data-table component) """ # Handle authentication auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): api_key = auth_header[7:] try: user_data = await verify_api_key(api_key) if not user_data: raise HTTPException(status_code=401, detail="Invalid API key") username = user_data['username'] except HTTPException: raise except Exception as e: raise HTTPException(status_code=401, detail="Invalid API key") else: if not hasattr(request.state, "user"): raise HTTPException(status_code=401, detail="Authentication required") username = request.state.user.username # Get the context for this session try: context = await get_context(log_id, username) except Exception as e: raise HTTPException(status_code=404, detail=f"Session not found: {str(e)}") # Check if command exists if command not in command_manager.functions: raise HTTPException(status_code=404, detail=f"Command '{command}' not found") # Parse args from JSON try: parsed_args = json.loads(args) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON in args parameter") # Execute the command try: parsed_args['context'] = context result = await command_manager.execute(command, **parsed_args) # Return result directly for data-table compatibility return result except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))
[docs] @router.get("/session/{agent_name}/{log_id}/cmd") async def execute_command_session_get(request: Request, agent_name: str, log_id: str, command: str = Query(...), args: str = Query("{}")): """ Execute a command using GET request from session URL (for relative URLs in components). Parameters: - agent_name: The agent name (from URL path) - log_id: The session log_id to use for context - command: The command name to execute (query param) - args: JSON-encoded arguments (query param, default: {}) Returns: - Direct command result (for use with data-table component) """ # Handle authentication auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): api_key = auth_header[7:] try: user_data = await verify_api_key(api_key) if not user_data: raise HTTPException(status_code=401, detail="Invalid API key") username = user_data['username'] except HTTPException: raise except Exception as e: raise HTTPException(status_code=401, detail="Invalid API key") else: if not hasattr(request.state, "user"): raise HTTPException(status_code=401, detail="Authentication required") username = request.state.user.username # Forward to the existing GET handler logic return await execute_command_get( request=request, log_id=log_id, command=command, args=args )
[docs] @router.post("/chat/{log_id}/{task_id}/cancel") async def cancel_chat(request: Request, log_id: str, task_id: str): debug_box("cancel_chat") print("Trying to cancel task", task_id) user = request.state.user.username context = await get_context(log_id, user) debug_box(str(context)) context.data['finished_conversation'] = True #if task_id in tasks: # task = tasks[task_id] # await asyncio.sleep(0.75) # task.cancel() # del tasks[task_id] return {"status": "ok", "message": "Task cancelled successfully"}
#else: # raise HTTPException(status_code=404, detail="Task not found")
[docs] @router.get("/context1/{log_id}") async def context1(request: Request, log_id: str): user = request.state.user.username context = await get_context(log_id, user) print(context) return "ok"
# need to serve persona images from ./personas/local/[persona_path]/avatar.png
[docs] @router.get("/chat/personas/{persona_path:path}/avatar.png") async def get_persona_avatar(persona_path: str): # Check if this is a registry persona with deduplicated assets if persona_path.startswith("registry/"): persona_json_path = f"personas/{persona_path}/persona.json" if os.path.exists(persona_json_path): try: with open(persona_json_path, "r") as f: persona_data = json.load(f) # Check if persona has asset hashes (deduplicated storage) asset_hashes = persona_data.get("asset_hashes", {}) if "avatar" in asset_hashes: # Redirect to deduplicated asset endpoint return RedirectResponse(f"/assets/{asset_hashes['avatar']}") except Exception as e: print(f"Error checking for deduplicated assets: {e}") # Handle registry personas: registry/owner/name if persona_path.startswith('registry/'): file_path = f"personas/{persona_path}/avatar.png" else: # Legacy support: check local first, then shared file_path = f"personas/local/{persona_path}/avatar.png" if not os.path.exists(file_path): file_path = f"personas/registry/{persona_path}/avatar.png" if not os.path.exists(file_path): resolved = os.path.realpath(file_path) return {"error": "File not found: " + resolved} with open(file_path, "rb") as f: image_bytes = f.read() return Response( content=image_bytes, media_type="image/png", headers={ "Cache-Control": "max-age=3600", "Content-Disposition": "inline; filename=avatar.png" } )
[docs] @router.get("/chat/personas/{persona_path:path}/faceref.png") async def get_persona_faceref(persona_path: str): # Check if this is a registry persona with deduplicated assets if persona_path.startswith("registry/"): persona_json_path = f"personas/{persona_path}/persona.json" if os.path.exists(persona_json_path): try: with open(persona_json_path, "r") as f: persona_data = json.load(f) # Check if persona has asset hashes (deduplicated storage) asset_hashes = persona_data.get("asset_hashes", {}) if "faceref" in asset_hashes: # Redirect to deduplicated asset endpoint return RedirectResponse(f"/assets/{asset_hashes['faceref']}") except Exception as e: print(f"Error checking for deduplicated assets: {e}") # Handle registry personas: registry/owner/name if persona_path.startswith('registry/'): file_path = f"personas/{persona_path}/faceref.png" else: # Legacy support: check local first, then shared file_path = f"personas/local/{persona_path}/faceref.png" if not os.path.exists(file_path): file_path = f"personas/registry/{persona_path}/faceref.png" if not os.path.exists(file_path): # Fallback to avatar if faceref doesn't exist return RedirectResponse(f"/chat/personas/{persona_path}/avatar.png") with open(file_path, "rb") as f: image_bytes = f.read() return Response( content=image_bytes, media_type="image/png", headers={ "Cache-Control": "max-age=3600", "Content-Disposition": "inline; filename=faceref.png" } )
[docs] @router.get("/chat/{log_id}/events") async def chat_events(log_id: str): return EventSourceResponse(await subscribe_to_agent_messages(log_id))
[docs] @router.post("/chat/{log_id}/send") async def send_message(request: Request, log_id: str, message_parts: List[MessageParts] ): # Check for API key in Authorization header (Bearer token) auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): api_key = auth_header[7:] # Remove "Bearer " prefix try: user_data = await verify_api_key(api_key) if not user_data: raise HTTPException(status_code=401, detail="Invalid API key") # Create a mock user object for API key users class MockUser: def __init__(self, username): self.username = username user = MockUser(user_data['username']) except HTTPException: raise except Exception as e: raise HTTPException(status_code=401, detail="Invalid API key") else: # Use regular session authentication if not hasattr(request.state, "user"): raise HTTPException(status_code=401, detail="Authentication required") user = request.state.user debug_box("send_message") context = await get_context(log_id, user.username) debug_box(str(context)) #context = ChatContext(command_manager, service_manager, user=user.user) task = asyncio.create_task(send_message_to_agent(log_id, message_parts, context=context, user=user)) #task = asyncio.create_task(send_message_to_agent(log_id, message_parts, user=user)) task_id = nanoid.generate() tasks[task_id] = task return {"status": "ok", "task_id": task_id}
[docs] @router.get("/agent/{agent_name}", response_class=HTMLResponse) async def get_chat_html(request: Request, agent_name: str, api_key: str = Query(None), embed: bool = Query(False)): # Handle API key authentication if provided if api_key: try: user_data = await verify_api_key(api_key) if not user_data: raise HTTPException(status_code=401, detail="Invalid API key") # Create a mock user object for API key users class MockUser: def __init__(self, username): self.username = username user = MockUser(user_data['username']) except Exception as e: raise HTTPException(status_code=401, detail="Invalid API key") else: # Use regular authentication if not hasattr(request.state, "user"): return RedirectResponse("/login") user = request.state.user log_id = nanoid.generate() plugins = list_enabled() print("Init chat with user", user) print("User type:", type(user)) print(f"Init chat with {agent_name}") await init_chat_session(user, agent_name, log_id) if hasattr(request.state, "access_token"): debug_box("Access token found in request state, saving to session file") access_token = request.state.access_token await save_session_data(log_id, "access_token", access_token) print("..") debug_box("Access token saved to session file") else: debug_box("No access token found in request state") # Initialize session data from query parameters # Skip standard parameters that are used for other purposes skip_params = {'api_key', 'embed'} session_params = {k: v for k, v in request.query_params.items() if k not in skip_params} if session_params: # Initialize session data with query parameters context = await get_context(log_id, user.username if hasattr(user, 'username') else user['username']) if 'session' not in context.data: context.data['session'] = {} # Add all query parameters to session data context.data['session'].update(session_params) # Always add server working directory context.data['session']['server_working_directory'] = os.getcwd() await context.save_context_data() print(f"Initialized session data from query params for {log_id}: {session_params}") # If embed mode is requested, redirect to embed session if embed: return RedirectResponse(f"/session/{agent_name}/{log_id}?embed=true") # Regular redirect return RedirectResponse(f"/session/{agent_name}/{log_id}")
[docs] @router.get("/makesession/{agent_name}") async def make_session(request: Request, agent_name: str): """ Create a new chat session for the specified agent. Returns a redirect to the chat session page. """ if not hasattr(request.state, "user"): return RedirectResponse("/login") user = request.state.user log_id = nanoid.generate() await init_chat_session(user, agent_name, log_id) return JSONResponse({ "log_id": log_id })
[docs] @router.get("/history/{agent_name}/{log_id}") async def chat_history(request: Request, agent_name: str, log_id: str): user = request.state.user.username history = await get_chat_history(agent_name, log_id, user) if history is None or len(history) == 0: try: print("trying to load from system session") history = await get_chat_history(agent_name, log_id, "system") except Exception as e: print("Error loading from system session:", e) history = [] pass return history
[docs] @router.get("/session/{agent_name}/{log_id}") async def chat_session_redirect(request: Request, agent_name: str, log_id: str): """Redirect to trailing slash version for proper relative URL resolution.""" # Check if there are query params to preserve query_string = str(request.query_params) if query_string: return RedirectResponse(f"/session/{agent_name}/{log_id}/?{query_string}") return RedirectResponse(f"/session/{agent_name}/{log_id}/")
[docs] @router.get("/session/{agent_name}/{log_id}/") async def chat_session(request: Request, agent_name: str, log_id: str, embed: bool = Query(False)): # Check authentication (API key or regular user) plugins = list_enabled() if not hasattr(request.state, "user"): return RedirectResponse("/login") user = request.state.user agent = await service_manager.get_agent_data(agent_name) persona = agent['persona']['name'] print("persona is:", persona) auth_token = None try: auth_token = await load_session_data(log_id, "access_token") except: pass chat_data = {"log_id": log_id, "agent_name": agent_name, "user": user, "persona": persona } if auth_token is not None: chat_data["access_token"] = auth_token # Add embed mode flag if embed: chat_data["embed_mode"] = True html = await render('chat', chat_data) return HTMLResponse(html) # use starlette staticfiles to mount ./imgs app.mount("/published", StaticFiles(directory=str(published_dir)), name="published_indices")
[docs] class TaskRequest(BaseModel): instructions: str
[docs] @router.post("/task/{agent_name}") async def run_task_route(request: Request, agent_name: str, task_request: TaskRequest = None, api_key: str = Query(None)): """ Run a task for an agent with the given instructions. This endpoint allows programmatic interaction with agents without a full chat session. Parameters: - agent_name: The name of the agent to run the task - instructions: The instructions/prompt to send to the agent Returns: - JSON with results and log_id for tracking """ # Handle API key authentication if provided if api_key: try: user_data = await verify_api_key(api_key) if not user_data: raise HTTPException(status_code=401, detail="Invalid API key") # Create a mock user object for API key users class MockUser: def __init__(self, username): self.username = username user_obj = MockUser(user_data['username']) user = user_obj.username except HTTPException: raise except Exception as e: raise HTTPException(status_code=401, detail="Invalid API key") else: # Use regular session authentication if not hasattr(request.state, "user"): raise HTTPException(status_code=401, detail="Authentication required") user = request.state.user.username instructions = None if task_request is not None: instructions = task_request.instructions if not instructions: return {"status": "error", "message": "No instructions provided"} task_result, full_results, log_id = await run_task(instructions=instructions, agent_name=agent_name, user=user) # Initialize session data from query parameters after task creation # Skip standard parameters that are used for other purposes skip_params = {'api_key'} session_params = {k: v for k, v in request.query_params.items() if k not in skip_params} if session_params and log_id: try: context = await get_context(log_id, user) if 'session' not in context.data: context.data['session'] = {} # Add all query parameters to session data context.data['session'].update(session_params) # Always add server working directory context.data['session']['server_working_directory'] = os.getcwd() await context.save_context_data() print(f"Updated task session data from query params for {log_id}: {session_params}") except Exception as e: print(f"Error updating TUI session data: {e}") print(task_result) print(full_results) print(log_id) return {"status": "ok", "results": task_result, "full_results": full_results, "log_id": log_id}
[docs] @router.post("/chat/{log_id}/upload") async def upload_file(request: Request, log_id: str, file: UploadFile = File(...)): """ Upload a file and store it in a user-specific directory. Returns the file path that can be used in messages. """ user = request.state.user.username # Create user uploads directory if it doesn't exist user_upload_dir = f"data/users/{user}/uploads/{log_id}" os.makedirs(user_upload_dir, exist_ok=True) # Generate a safe filename to prevent path traversal filename = os.path.basename(file.filename) file_path = os.path.join(user_upload_dir, filename) # Save the file with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Return the file information return { "status": "ok", "filename": filename, "path": file_path, "mime_type": file.content_type }
from lib.chatlog import count_tokens_for_log_id
[docs] @router.get("/chat/{log_id}/tokens") async def get_token_count(request: Request, log_id: str): """ Get token counts for a chat log identified by log_id, including any delegated tasks. Parameters: - log_id: The log ID to count tokens for Returns: - JSON with token counts or error message if log not found """ token_counts = await count_tokens_for_log_id(log_id) if token_counts is None: return {"status": "error", "message": f"Chat log with ID {log_id} not found"} return {"status": "ok", "token_counts": token_counts}
[docs] @router.get("/chat/del_session/{log_id}") async def delete_chat_session(request: Request, log_id: str, user=Depends(require_user)): """ Delete a chat session by log_id, including chat logs, context files, and all child sessions. Parameters: - log_id: The log ID of the session to delete Returns: - JSON with success status and message """ try: # Try to determine the agent name from the context file first agent_name = "unknown" context_dir = os.environ.get('CHATCONTEXT_DIR', 'data/context') context_file_path = f"{context_dir}/{user.username}/context_{log_id}.json" if os.path.exists(context_file_path): try: with open(context_file_path, 'r') as f: context_data = json.load(f) agent_name = context_data.get('agent_name', 'unknown') print(f"Found agent name '{agent_name}' from context file for log_id {log_id}") except Exception as e: print(f"Error reading context file {context_file_path}: {e}") # If we still don't have the agent name, try to find the chatlog file if agent_name == "unknown": from lib.chatlog import find_chatlog_file chatlog_path = find_chatlog_file(log_id) if chatlog_path: # Extract agent from path: data/chat/{user}/{agent}/chatlog_{log_id}.json path_parts = chatlog_path.split(os.sep) if len(path_parts) >= 3: agent_name = path_parts[-2] # Agent is the second-to-last part print(f"Found agent name '{agent_name}' from chatlog file path for log_id {log_id}") await ChatContext.delete_session_by_id(log_id=log_id, user=user.username, agent=agent_name, cascade=True) return {"status": "ok", "message": f"Chat session {log_id} deleted successfully"} except Exception as e: print(f"Error deleting chat session {log_id}: {e}") raise HTTPException(status_code=500, detail=f"Error deleting chat session: {str(e)}")
[docs] @router.get("/chat/{log_id}/tokens") async def get_token_count_alt(request: Request, log_id: str): """ Alternative token count endpoint using token_counter module. Parameters: - log_id: The log ID to count tokens for Returns: - JSON with token counts or error message if log not found """ from lib.token_counter import count_tokens_for_log_id token_counts = await count_tokens_for_log_id(log_id) if token_counts is None: return {"status": "error", "message": f"Chat log with ID {log_id} not found"} return {"status": "ok", "token_counts": token_counts}
# Include widget routes try: from .widget_routes import router as widget_router router.include_router(widget_router) except ImportError as e: print(f"Warning: Could not load widget routes: {e}")