from fastapi import APIRouter, HTTPException, Request, Response, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi import File, UploadFile, Form
from sse_starlette.sse import EventSourceResponse
from .models import MessageParts
from lib.providers.services import service, service_manager
from .services import init_chat_session, send_message_to_agent, subscribe_to_agent_messages, get_chat_history, run_task
from lib.templates import render
from lib.auth.auth import require_user
from lib.plugins import list_enabled
import nanoid
from lib.providers.commands import *
import asyncio
from lib.chatcontext import get_context, ChatContext
from typing import List
from lib.providers.services import service, service_manager
from lib.providers.commands import command_manager
from lib.utils.debug import debug_box
from lib.session_files import load_session_data, save_session_data
import os
import json
from lib.chatcontext import ChatContext
import shutil
from pydantic import BaseModel
from lib.auth.api_key import verify_api_key
from lib.providers.commands import command_manager
router = APIRouter()
# Global dictionary to store tasks
tasks = {}
[docs]
class CommandRequest(BaseModel):
command: str
args: dict = {}
[docs]
@router.post("/cmd/{log_id}")
async def execute_command_with_session(request: Request, log_id: str, cmd_request: CommandRequest):
"""
Execute a command using an existing session's context.
Parameters:
- log_id: The session log_id to use for context
- command: The command name to execute
- args: Dictionary of arguments to pass to the command
Returns:
- JSON with command result or error
"""
# Handle authentication
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
api_key = auth_header[7:]
try:
user_data = await verify_api_key(api_key)
if not user_data:
raise HTTPException(status_code=401, detail="Invalid API key")
username = user_data['username']
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid API key")
else:
if not hasattr(request.state, "user"):
raise HTTPException(status_code=401, detail="Authentication required")
username = request.state.user.username
# Get the context for this session
try:
context = await get_context(log_id, username)
except Exception as e:
raise HTTPException(status_code=404, detail=f"Session not found: {str(e)}")
# Check if command exists
if cmd_request.command not in command_manager.functions:
raise HTTPException(status_code=404, detail=f"Command '{cmd_request.command}' not found")
# Execute the command
try:
# Add context to args
args = cmd_request.args.copy()
args['context'] = context
result = await command_manager.execute(cmd_request.command, **args)
return {"status": "ok", "result": result}
except Exception as e:
import traceback
traceback.print_exc()
return {"status": "error", "message": str(e)}
[docs]
@router.get("/cmd/{log_id}")
async def execute_command_get(request: Request, log_id: str, command: str = Query(...), args: str = Query("{}")):
"""
Execute a command using GET request (useful for data-table retrieve-url).
Parameters:
- log_id: The session log_id to use for context
- command: The command name to execute (query param)
- args: JSON-encoded arguments (query param, default: {})
Returns:
- Direct command result (for use with data-table component)
"""
# Handle authentication
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
api_key = auth_header[7:]
try:
user_data = await verify_api_key(api_key)
if not user_data:
raise HTTPException(status_code=401, detail="Invalid API key")
username = user_data['username']
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid API key")
else:
if not hasattr(request.state, "user"):
raise HTTPException(status_code=401, detail="Authentication required")
username = request.state.user.username
# Get the context for this session
try:
context = await get_context(log_id, username)
except Exception as e:
raise HTTPException(status_code=404, detail=f"Session not found: {str(e)}")
# Check if command exists
if command not in command_manager.functions:
raise HTTPException(status_code=404, detail=f"Command '{command}' not found")
# Parse args from JSON
try:
parsed_args = json.loads(args)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON in args parameter")
# Execute the command
try:
parsed_args['context'] = context
result = await command_manager.execute(command, **parsed_args)
# Return result directly for data-table compatibility
return result
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
[docs]
@router.get("/session/{agent_name}/{log_id}/cmd")
async def execute_command_session_get(request: Request, agent_name: str, log_id: str, command: str = Query(...), args: str = Query("{}")):
"""
Execute a command using GET request from session URL (for relative URLs in components).
Parameters:
- agent_name: The agent name (from URL path)
- log_id: The session log_id to use for context
- command: The command name to execute (query param)
- args: JSON-encoded arguments (query param, default: {})
Returns:
- Direct command result (for use with data-table component)
"""
# Handle authentication
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
api_key = auth_header[7:]
try:
user_data = await verify_api_key(api_key)
if not user_data:
raise HTTPException(status_code=401, detail="Invalid API key")
username = user_data['username']
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid API key")
else:
if not hasattr(request.state, "user"):
raise HTTPException(status_code=401, detail="Authentication required")
username = request.state.user.username
# Forward to the existing GET handler logic
return await execute_command_get(
request=request,
log_id=log_id,
command=command,
args=args
)
[docs]
@router.post("/chat/{log_id}/{task_id}/cancel")
async def cancel_chat(request: Request, log_id: str, task_id: str):
debug_box("cancel_chat")
print("Trying to cancel task", task_id)
user = request.state.user.username
context = await get_context(log_id, user)
debug_box(str(context))
context.data['finished_conversation'] = True
#if task_id in tasks:
# task = tasks[task_id]
# await asyncio.sleep(0.75)
# task.cancel()
# del tasks[task_id]
return {"status": "ok", "message": "Task cancelled successfully"}
#else:
# raise HTTPException(status_code=404, detail="Task not found")
[docs]
@router.get("/context1/{log_id}")
async def context1(request: Request, log_id: str):
user = request.state.user.username
context = await get_context(log_id, user)
print(context)
return "ok"
# need to serve persona images from ./personas/local/[persona_path]/avatar.png
[docs]
@router.get("/chat/personas/{persona_path:path}/avatar.png")
async def get_persona_avatar(persona_path: str):
# Check if this is a registry persona with deduplicated assets
if persona_path.startswith("registry/"):
persona_json_path = f"personas/{persona_path}/persona.json"
if os.path.exists(persona_json_path):
try:
with open(persona_json_path, "r") as f:
persona_data = json.load(f)
# Check if persona has asset hashes (deduplicated storage)
asset_hashes = persona_data.get("asset_hashes", {})
if "avatar" in asset_hashes:
# Redirect to deduplicated asset endpoint
return RedirectResponse(f"/assets/{asset_hashes['avatar']}")
except Exception as e:
print(f"Error checking for deduplicated assets: {e}")
# Handle registry personas: registry/owner/name
if persona_path.startswith('registry/'):
file_path = f"personas/{persona_path}/avatar.png"
else:
# Legacy support: check local first, then shared
file_path = f"personas/local/{persona_path}/avatar.png"
if not os.path.exists(file_path):
file_path = f"personas/registry/{persona_path}/avatar.png"
if not os.path.exists(file_path):
resolved = os.path.realpath(file_path)
return {"error": "File not found: " + resolved}
with open(file_path, "rb") as f:
image_bytes = f.read()
return Response(
content=image_bytes,
media_type="image/png",
headers={
"Cache-Control": "max-age=3600",
"Content-Disposition": "inline; filename=avatar.png"
}
)
[docs]
@router.get("/chat/personas/{persona_path:path}/faceref.png")
async def get_persona_faceref(persona_path: str):
# Check if this is a registry persona with deduplicated assets
if persona_path.startswith("registry/"):
persona_json_path = f"personas/{persona_path}/persona.json"
if os.path.exists(persona_json_path):
try:
with open(persona_json_path, "r") as f:
persona_data = json.load(f)
# Check if persona has asset hashes (deduplicated storage)
asset_hashes = persona_data.get("asset_hashes", {})
if "faceref" in asset_hashes:
# Redirect to deduplicated asset endpoint
return RedirectResponse(f"/assets/{asset_hashes['faceref']}")
except Exception as e:
print(f"Error checking for deduplicated assets: {e}")
# Handle registry personas: registry/owner/name
if persona_path.startswith('registry/'):
file_path = f"personas/{persona_path}/faceref.png"
else:
# Legacy support: check local first, then shared
file_path = f"personas/local/{persona_path}/faceref.png"
if not os.path.exists(file_path):
file_path = f"personas/registry/{persona_path}/faceref.png"
if not os.path.exists(file_path):
# Fallback to avatar if faceref doesn't exist
return RedirectResponse(f"/chat/personas/{persona_path}/avatar.png")
with open(file_path, "rb") as f:
image_bytes = f.read()
return Response(
content=image_bytes,
media_type="image/png",
headers={
"Cache-Control": "max-age=3600",
"Content-Disposition": "inline; filename=faceref.png"
}
)
[docs]
@router.get("/chat/{log_id}/events")
async def chat_events(log_id: str):
return EventSourceResponse(await subscribe_to_agent_messages(log_id))
[docs]
@router.post("/chat/{log_id}/send")
async def send_message(request: Request, log_id: str, message_parts: List[MessageParts] ):
# Check for API key in Authorization header (Bearer token)
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
api_key = auth_header[7:] # Remove "Bearer " prefix
try:
user_data = await verify_api_key(api_key)
if not user_data:
raise HTTPException(status_code=401, detail="Invalid API key")
# Create a mock user object for API key users
class MockUser:
def __init__(self, username):
self.username = username
user = MockUser(user_data['username'])
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid API key")
else:
# Use regular session authentication
if not hasattr(request.state, "user"):
raise HTTPException(status_code=401, detail="Authentication required")
user = request.state.user
debug_box("send_message")
context = await get_context(log_id, user.username)
debug_box(str(context))
#context = ChatContext(command_manager, service_manager, user=user.user)
task = asyncio.create_task(send_message_to_agent(log_id, message_parts, context=context, user=user))
#task = asyncio.create_task(send_message_to_agent(log_id, message_parts, user=user))
task_id = nanoid.generate()
tasks[task_id] = task
return {"status": "ok", "task_id": task_id}
[docs]
@router.get("/agent/{agent_name}", response_class=HTMLResponse)
async def get_chat_html(request: Request, agent_name: str, api_key: str = Query(None), embed: bool = Query(False)):
# Handle API key authentication if provided
if api_key:
try:
user_data = await verify_api_key(api_key)
if not user_data:
raise HTTPException(status_code=401, detail="Invalid API key")
# Create a mock user object for API key users
class MockUser:
def __init__(self, username):
self.username = username
user = MockUser(user_data['username'])
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid API key")
else:
# Use regular authentication
if not hasattr(request.state, "user"):
return RedirectResponse("/login")
user = request.state.user
log_id = nanoid.generate()
plugins = list_enabled()
print("Init chat with user", user)
print("User type:", type(user))
print(f"Init chat with {agent_name}")
await init_chat_session(user, agent_name, log_id)
if hasattr(request.state, "access_token"):
debug_box("Access token found in request state, saving to session file")
access_token = request.state.access_token
await save_session_data(log_id, "access_token", access_token)
print("..")
debug_box("Access token saved to session file")
else:
debug_box("No access token found in request state")
# Initialize session data from query parameters
# Skip standard parameters that are used for other purposes
skip_params = {'api_key', 'embed'}
session_params = {k: v for k, v in request.query_params.items() if k not in skip_params}
if session_params:
# Initialize session data with query parameters
context = await get_context(log_id, user.username if hasattr(user, 'username') else user['username'])
if 'session' not in context.data:
context.data['session'] = {}
# Add all query parameters to session data
context.data['session'].update(session_params)
# Always add server working directory
context.data['session']['server_working_directory'] = os.getcwd()
await context.save_context_data()
print(f"Initialized session data from query params for {log_id}: {session_params}")
# If embed mode is requested, redirect to embed session
if embed:
return RedirectResponse(f"/session/{agent_name}/{log_id}?embed=true")
# Regular redirect
return RedirectResponse(f"/session/{agent_name}/{log_id}")
[docs]
@router.get("/makesession/{agent_name}")
async def make_session(request: Request, agent_name: str):
"""
Create a new chat session for the specified agent.
Returns a redirect to the chat session page.
"""
if not hasattr(request.state, "user"):
return RedirectResponse("/login")
user = request.state.user
log_id = nanoid.generate()
await init_chat_session(user, agent_name, log_id)
return JSONResponse({ "log_id": log_id })
[docs]
@router.get("/history/{agent_name}/{log_id}")
async def chat_history(request: Request, agent_name: str, log_id: str):
user = request.state.user.username
history = await get_chat_history(agent_name, log_id, user)
if history is None or len(history) == 0:
try:
print("trying to load from system session")
history = await get_chat_history(agent_name, log_id, "system")
except Exception as e:
print("Error loading from system session:", e)
history = []
pass
return history
[docs]
@router.get("/session/{agent_name}/{log_id}")
async def chat_session_redirect(request: Request, agent_name: str, log_id: str):
"""Redirect to trailing slash version for proper relative URL resolution."""
# Check if there are query params to preserve
query_string = str(request.query_params)
if query_string:
return RedirectResponse(f"/session/{agent_name}/{log_id}/?{query_string}")
return RedirectResponse(f"/session/{agent_name}/{log_id}/")
[docs]
@router.get("/session/{agent_name}/{log_id}/")
async def chat_session(request: Request, agent_name: str, log_id: str, embed: bool = Query(False)):
# Check authentication (API key or regular user)
plugins = list_enabled()
if not hasattr(request.state, "user"):
return RedirectResponse("/login")
user = request.state.user
agent = await service_manager.get_agent_data(agent_name)
persona = agent['persona']['name']
print("persona is:", persona)
auth_token = None
try:
auth_token = await load_session_data(log_id, "access_token")
except:
pass
chat_data = {"log_id": log_id, "agent_name": agent_name, "user": user, "persona": persona }
if auth_token is not None:
chat_data["access_token"] = auth_token
# Add embed mode flag
if embed:
chat_data["embed_mode"] = True
html = await render('chat', chat_data)
return HTMLResponse(html)
# use starlette staticfiles to mount ./imgs
app.mount("/published", StaticFiles(directory=str(published_dir)), name="published_indices")
[docs]
class TaskRequest(BaseModel):
instructions: str
[docs]
@router.post("/task/{agent_name}")
async def run_task_route(request: Request, agent_name: str, task_request: TaskRequest = None, api_key: str = Query(None)):
"""
Run a task for an agent with the given instructions.
This endpoint allows programmatic interaction with agents without a full chat session.
Parameters:
- agent_name: The name of the agent to run the task
- instructions: The instructions/prompt to send to the agent
Returns:
- JSON with results and log_id for tracking
"""
# Handle API key authentication if provided
if api_key:
try:
user_data = await verify_api_key(api_key)
if not user_data:
raise HTTPException(status_code=401, detail="Invalid API key")
# Create a mock user object for API key users
class MockUser:
def __init__(self, username):
self.username = username
user_obj = MockUser(user_data['username'])
user = user_obj.username
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid API key")
else:
# Use regular session authentication
if not hasattr(request.state, "user"):
raise HTTPException(status_code=401, detail="Authentication required")
user = request.state.user.username
instructions = None
if task_request is not None:
instructions = task_request.instructions
if not instructions:
return {"status": "error", "message": "No instructions provided"}
task_result, full_results, log_id = await run_task(instructions=instructions, agent_name=agent_name, user=user)
# Initialize session data from query parameters after task creation
# Skip standard parameters that are used for other purposes
skip_params = {'api_key'}
session_params = {k: v for k, v in request.query_params.items() if k not in skip_params}
if session_params and log_id:
try:
context = await get_context(log_id, user)
if 'session' not in context.data:
context.data['session'] = {}
# Add all query parameters to session data
context.data['session'].update(session_params)
# Always add server working directory
context.data['session']['server_working_directory'] = os.getcwd()
await context.save_context_data()
print(f"Updated task session data from query params for {log_id}: {session_params}")
except Exception as e:
print(f"Error updating TUI session data: {e}")
print(task_result)
print(full_results)
print(log_id)
return {"status": "ok", "results": task_result, "full_results": full_results, "log_id": log_id}
[docs]
@router.post("/chat/{log_id}/upload")
async def upload_file(request: Request, log_id: str, file: UploadFile = File(...)):
"""
Upload a file and store it in a user-specific directory.
Returns the file path that can be used in messages.
"""
user = request.state.user.username
# Create user uploads directory if it doesn't exist
user_upload_dir = f"data/users/{user}/uploads/{log_id}"
os.makedirs(user_upload_dir, exist_ok=True)
# Generate a safe filename to prevent path traversal
filename = os.path.basename(file.filename)
file_path = os.path.join(user_upload_dir, filename)
# Save the file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Return the file information
return {
"status": "ok",
"filename": filename,
"path": file_path,
"mime_type": file.content_type
}
from lib.chatlog import count_tokens_for_log_id
[docs]
@router.get("/chat/{log_id}/tokens")
async def get_token_count(request: Request, log_id: str):
"""
Get token counts for a chat log identified by log_id, including any delegated tasks.
Parameters:
- log_id: The log ID to count tokens for
Returns:
- JSON with token counts or error message if log not found
"""
token_counts = await count_tokens_for_log_id(log_id)
if token_counts is None:
return {"status": "error", "message": f"Chat log with ID {log_id} not found"}
return {"status": "ok", "token_counts": token_counts}
[docs]
@router.get("/chat/del_session/{log_id}")
async def delete_chat_session(request: Request, log_id: str, user=Depends(require_user)):
"""
Delete a chat session by log_id, including chat logs, context files, and all child sessions.
Parameters:
- log_id: The log ID of the session to delete
Returns:
- JSON with success status and message
"""
try:
# Try to determine the agent name from the context file first
agent_name = "unknown"
context_dir = os.environ.get('CHATCONTEXT_DIR', 'data/context')
context_file_path = f"{context_dir}/{user.username}/context_{log_id}.json"
if os.path.exists(context_file_path):
try:
with open(context_file_path, 'r') as f:
context_data = json.load(f)
agent_name = context_data.get('agent_name', 'unknown')
print(f"Found agent name '{agent_name}' from context file for log_id {log_id}")
except Exception as e:
print(f"Error reading context file {context_file_path}: {e}")
# If we still don't have the agent name, try to find the chatlog file
if agent_name == "unknown":
from lib.chatlog import find_chatlog_file
chatlog_path = find_chatlog_file(log_id)
if chatlog_path:
# Extract agent from path: data/chat/{user}/{agent}/chatlog_{log_id}.json
path_parts = chatlog_path.split(os.sep)
if len(path_parts) >= 3:
agent_name = path_parts[-2] # Agent is the second-to-last part
print(f"Found agent name '{agent_name}' from chatlog file path for log_id {log_id}")
await ChatContext.delete_session_by_id(log_id=log_id, user=user.username, agent=agent_name, cascade=True)
return {"status": "ok", "message": f"Chat session {log_id} deleted successfully"}
except Exception as e:
print(f"Error deleting chat session {log_id}: {e}")
raise HTTPException(status_code=500, detail=f"Error deleting chat session: {str(e)}")
[docs]
@router.get("/chat/{log_id}/tokens")
async def get_token_count_alt(request: Request, log_id: str):
"""
Alternative token count endpoint using token_counter module.
Parameters:
- log_id: The log ID to count tokens for
Returns:
- JSON with token counts or error message if log not found
"""
from lib.token_counter import count_tokens_for_log_id
token_counts = await count_tokens_for_log_id(log_id)
if token_counts is None:
return {"status": "error", "message": f"Chat log with ID {log_id} not found"}
return {"status": "ok", "token_counts": token_counts}
# Include widget routes
try:
from .widget_routes import router as widget_router
router.include_router(widget_router)
except ImportError as e:
print(f"Warning: Could not load widget routes: {e}")