Source code for mindroot.coreplugins.events.router
from fastapi import APIRouter, Depends, HTTPException, Form, Response, Request, Query
from lib.route_decorators import public_routes, public_route
from sse_starlette.sse import EventSourceResponse
from typing import List
import asyncio
import aiohttp
import json
router = APIRouter()
[docs]
async def create_sse_client(url: str, access_token: str, queue: asyncio.Queue, conv_id: str):
"""Creates internal SSE client that forwards to queue with conversation ID"""
headers = {
'Cookie': f'access_token={access_token}'
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
current_event = None
current_data = None
async for line in response.content:
if line:
try:
decoded_line = line.decode().strip()
# Handle event line
if decoded_line.startswith('event:'):
current_event = decoded_line.split(':', 1)[1].strip()
continue
# Handle data line
if decoded_line.startswith('data:'):
current_data = decoded_line.split(':', 1)[1].strip()
try:
# Parse and add conversation ID
data = json.loads(current_data)
data['conversation_id'] = conv_id
current_data = json.dumps(data)
except Exception as e:
print(f"Error processing data JSON: {e}")
# Reconstruct SSE message
if current_event:
await queue.put(f"event: {current_event}\ndata: {current_data}\n\n")
else:
await queue.put(f"data: {current_data}\n\n")
current_event = None
current_data = None
except Exception as e:
print(f"Error processing SSE line: {e}")
[docs]
@router.get("/events/multi")
async def multiplexed_events(
request: Request,
conversation_ids: List[str] = Query(None)
):
if not conversation_ids:
raise HTTPException(status_code=400, detail="conversation_ids parameter is required")
# Get access token from cookie
access_token = request.cookies.get('access_token')
if not access_token:
raise HTTPException(status_code=401, detail="Access token required")
# Create queue for aggregated messages
main_queue = asyncio.Queue()
# Get base URL for chat events
base_url = str(request.base_url).rstrip('/')
tasks = []
for conv_id in conversation_ids:
url = f"{base_url}/chat/{conv_id}/events"
task = asyncio.create_task(
create_sse_client(url, access_token, main_queue, conv_id)
)
tasks.append(task)
async def event_generator():
try:
while True:
message = await main_queue.get()
if message:
yield message
except asyncio.CancelledError:
for task in tasks:
task.cancel()
raise
return EventSourceResponse(event_generator())