Source code for mindroot.lib.streamcmd
import asyncio
import sys
from typing import Optional, Callable, Any, List, Dict, AsyncGenerator
COLORS = {'reset': '\x1b[0m', 'green': '\x1b[32m', 'yellow': '\x1b[33m', 'blue': '\x1b[34m', 'red': '\x1b[31m'}
[docs]
async def read_stream(stream: asyncio.StreamReader, callback: Callable[[str], Any]):
"""Read from stream line by line and call the callback for each line."""
while True:
line = await stream.readline()
if not line:
break
callback(line.decode('utf-8', errors='replace'))
[docs]
async def run_command_with_streaming(cmd: List[str], stdout_callback: Callable[[str], Any], stderr_callback: Callable[[str], Any], cwd: Optional[str]=None, env: Optional[dict]=None) -> int:
"""Run a command asynchronously and stream its output.
Args:
cmd: Command to run as a list of strings
stdout_callback: Callback for stdout lines
stderr_callback: Callback for stderr lines
cwd: Working directory for the command
env: Environment variables for the command
Returns:
Exit code of the command
"""
try:
process = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env)
stdout_task = asyncio.create_task(read_stream(process.stdout, stdout_callback))
stderr_task = asyncio.create_task(read_stream(process.stderr, stderr_callback))
await asyncio.gather(stdout_task, stderr_task)
exit_code = await process.wait()
return exit_code
except Exception as e:
return 1
[docs]
async def stream_command_as_events(cmd: List[str], cwd: Optional[str]=None, env: Optional[dict]=None) -> AsyncGenerator[Dict[str, str], None]:
"""Run a command and yield its output as events.
Args:
cmd: Command to run as a list of strings
cwd: Working directory for the command
env: Environment variables for the command
Yields:
Events with type and data
"""
yield {'event': 'message', 'data': f"Running command: {' '.join(cmd)}"}
output_queue = asyncio.Queue()
def stdout_callback(line):
if line.strip():
output_queue.put_nowait(('message', line.strip()))
def stderr_callback(line):
if line.strip():
if 'WARNING:' in line or 'DEPRECATION:' in line or 'A new release of pip is available' in line:
output_queue.put_nowait(('warning', line.strip()))
else:
output_queue.put_nowait(('warning', line.strip()))
run_task = asyncio.create_task(run_command_with_streaming(cmd, stdout_callback, stderr_callback, cwd, env))
while not run_task.done() or not output_queue.empty():
try:
event_type, data = await asyncio.wait_for(output_queue.get(), timeout=0.1)
yield {'event': event_type, 'data': data}
except asyncio.TimeoutError:
await asyncio.sleep(0.01)
exit_code = await run_task
if exit_code == 0:
yield {'event': 'complete', 'data': 'Command completed successfully'}
else:
yield {'event': 'error', 'data': f'Command failed with exit code {exit_code}'}