Metadata-Version: 2.4
Name: by-framework
Version: 0.1.12
Summary: 分布式 Agent 调度框架
License-File: LICENSE
Requires-Python: >=3.12
Requires-Dist: dill>=0.4.1
Requires-Dist: httpx>=0.28.1
Requires-Dist: redis>=7.0.0
Requires-Dist: typing-extensions>=4.0.0
Provides-Extra: dev
Requires-Dist: isort>=5.13.0; extra == 'dev'
Requires-Dist: pre-commit>=4.0.0; extra == 'dev'
Requires-Dist: pyink>=24.0.0; extra == 'dev'
Requires-Dist: pylint>=3.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.3.0; extra == 'dev'
Description-Content-Type: text/markdown

# 🚀 by-framework-python

<div align="center">

[![Version](https://img.shields.io/badge/version-0.1.0-blue.svg)](pyproject.toml)
[![Python](https://img.shields.io/badge/python-3.12+-yellow.svg)](pyproject.toml)
[![Redis](https://img.shields.io/badge/redis-7.0+-red.svg)](pyproject.toml)
[![License](https://img.shields.io/badge/license-Apache_2.0-green.svg)](LICENSE)

</div>

<div align="center">

[**English**](README.md) | [**中文**](README_zh.md)

</div>

---

**by-framework** is a distributed Agent scheduling framework built on Redis Streams. It provides worker orchestration, session-scoped runtime state, and plugin-based agent capability registration for AI agent systems.

---

## Table of Contents

- [✨ Key Features](#-key-features)
- [🏗️ Architecture](#️-architecture)
- [📦 Installation](#-installation)
- [🚀 Quick Start](#-quick-start)
- [💡 Deep Dive](#-deep-dive)
- [🔌 Plugin System](#-plugin-system)
- [📡 Sending Tasks](#-sending-tasks)
- [🧪 Examples](#-examples)
- [🛠️ Configuration Reference](#️-configuration-reference)
- [📚 API Reference](#-api-reference)
- [🧩 Advanced Capabilities](#-advanced-capabilities)
- [🚀 Deployment Guide](#-deployment-guide)

---

## ✨ Key Features

- ⚡ **Native Async**: Built on Python `asyncio`, perfectly suited for I/O-intensive Agent tasks.
- 🧩 **Highly Plugin-Based**: Plugin system with agent config registration for tools, prompts, skills, and lifecycle hooks.
- 📊 **State Management**: Complete `AgentContext` support for easy streaming output, state transitions, and artifact handling.
- 🔄 **Decoupled Architecture**: Separates control streams from data streams for scalable Worker orchestration.
- 🎯 **Agent Type Routing**: Workers declare supported `agent_types` via `get_agent_types()` for routing and liveness checks.

---

## 🏗️ Architecture

The system uses event-driven design with high decoupling:

```
┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│   Client    │──────▶│  Redis Input │──────▶│   Gateway    │
│ (Gateway)   │       │     MQ       │       │   Worker     │
└─────────────┘       └──────────────┘       └──────┬───────┘
        ▲                                              │
        │                                              │
        │                                              ▼
┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│  Consumer   │◀─────│  Redis Data  │◀─────│   Business   │
│ / Backend   │       │   Streams    │       │   Logic      │
└─────────────┘       └──────────────┘       └──────────────┘
```

### Core Components

- **Access Layer**: `GatewayClient` publishes control commands to Redis Input MQ.
- **Scheduling Layer**: Uses Redis Stream for competitive consumption and routing among Worker clusters.
- **Execution Layer**: `GatewayWorker` actively pulls tasks and executes business logic in isolated workspaces.
- **Output Layer**: Data is asynchronously pushed to session-scoped data streams for downstream consumers.

### Worker Routing Semantics

There are three layers of routing semantics:

- **membership**: Worker declares which `agent_types` it supports. This is a static relationship, only updated at startup and graceful shutdown.
- **online / heartbeat**: Whether the Worker can currently accept tasks. Only online Workers are considered valid send targets.
- **worker_id lock**: Prevents duplicate startup of the same `worker_id`. This is instance mutex, not part of agent type routing.

Production main path uses agent type stream:

- Client writes to `byai_gateway:ctrl:agent_type:{agent_type}`
- Multiple Workers under the same agent type consume competitively via Redis consumer group
- Only checks "whether at least one online Worker exists for this agent type" before sending
- Does not pre-select a specific Worker before sending

Debug or direct control path uses worker stream:

- When `target_worker_id` is explicitly provided, messages are written to `byai_gateway:ctrl:worker:{worker_id}`
- This path is only for debug, direct dispatch, or worker-level control commands
- Explicitly checks if the worker is online before sending

### Data Flow

```
User Request
    ↓
Gateway (write to control stream)
    ↓
Worker (consume control stream, process task)
    ↓
Redis Stream (write to session data stream)
    ↓
Backend or consumer (read session data stream)
    ↓
Frontend (render real-time AI response)
```

---

## 📦 Installation

### Prerequisites

- Python 3.12+
- Redis 7.0+ (for message queue)

### Install via pip

```bash
# Basic installation
pip install by-framework

# Optional: enable Langfuse tracing integration
pip install by-framework-trace-langfuse

# Development mode installation
pip install -e ".[dev]"
```

### Install via uv (Recommended)

```bash
# Clone the project and install all dependencies
cd by-framework-python
uv sync

# Optional: add Langfuse integration package to another project
uv add by-framework-trace-langfuse
```

`by-framework` no longer bundles Langfuse inside the main package. If
`by-framework-trace-langfuse` is installed and the Langfuse environment
variables are set, worker startup will automatically discover the provider,
register its plugin, and reuse the framework trace ids.

If you want to add another tracing vendor, see
[docs/trace_provider_development.md](docs/trace_provider_development.md).

### Workspace Development

This repository uses `uv workspace` to manage local package development.

Common commands:

```bash
# Sync workspace dependencies in repository root
uv sync

# Run package tests
uv run pytest tests
```

## 🚀 Quick Start

### 1. Create a Simple Agent Worker

Create `my_agent.py`:

```python
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker

class MyAssistant(GatewayWorker):
    def get_agent_types(self):
        # Declare the Agent types this Worker can handle
        return ["weather_agent", "chat_agent"]

    async def process_command(self, command, context: AgentContext):
        # Send streaming text chunks
        await context.emit_chunk("Processing your request...\n")

        # Simulate time-consuming operation
        await asyncio.sleep(0.5)

        # Update task state
        await context.emit_state("thinking")

        # Read request content from the command payload
        user_input = (
            command.content if isinstance(command.content, str) else str(command.content)
        )

        # Send thinking process
        await context.emit_chunk(f"I received: {user_input}\n")
        await asyncio.sleep(0.3)

        # Send final result
        await context.emit_chunk("This is my reply!")

        return {
            "status": "success",
            "message": "Task completed",
            "data": {"answer": "The weather is sunny today"}
        }

if __name__ == "__main__":
    run_worker(
        worker_class=MyAssistant,
        worker_id="worker-01",
        redis_host="localhost",
        redis_port=6379,
    )
```

### 2. Start Redis

```bash
docker run -d -p 6379:6379 redis:7-alpine
```

### 3. Start Worker

```bash
cd by-framework-python
uv run python my_agent.py
```

### 4. Send a Test Task

Create `send_task.py`:

```python
import asyncio

from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis


async def send_task():
    # Initialize shared Redis client and registry
    redis = init_redis(host="localhost", port=6379)
    registry = WorkerRegistry(redis_client=redis)

    # ByaiGatewayClient accepts redis_client / registry
    client = ByaiGatewayClient(redis_client=redis, registry=registry)

    response = await client.send_message(
        target_agent_type="weather_agent",
        session_id="session-001",
        content="How's the weather in Beijing today?",
    )

    if response.success:
        print(f"Task sent, message ID: {response.message_id}")
    else:
        print(f"Send failed: {response.error}")

    await close_redis()


asyncio.run(send_task())
```

Run:

```bash
uv run python send_task.py
```

---

## 💡 Deep Dive

### GatewayWorker Base Class

`GatewayWorker` is the base class for all custom Workers. You need to implement the following methods:

| Method | Required | Description |
|--------|----------|-------------|
| `get_agent_types()` | Yes | Returns list of Agent types this Worker can handle |
| `process_command(command, context)` | Yes | Handle specific business logic |

### AgentContext

`AgentContext` provides the ability to interact with the runtime environment:

```python
from by_framework import AgentContext, ArtifactEvent


async def process_command(self, command, context: AgentContext):
    # 1. Send streaming output
    await context.emit_chunk("Processing...")

    # 2. Send artifacts/structured data
    await context.emit_artifact(ArtifactEvent(url="https://example.com/result.json"))

    # 3. Get message ID and session ID
    msg_id = context.message_id
    session_id = context.session_id

    # 4. Call other Agents (supports suspending current task waiting for reply)
    result = await context.call_agent(
        target_agent_type="translator_agent",
        content="Hello",
        wait_for_reply=True
    )
```

### Commands and Message Protocol

#### AskAgentCommand (Task Command)

```python
from by_framework.core.protocol.commands import AskAgentCommand
from by_framework.core.protocol.message_header import MessageHeader

command = AskAgentCommand(
    header=MessageHeader(
        message_id="msg_123",
        session_id="sess_456",
        target_agent_type="weather_agent"
    ),
    content="Query Beijing weather",
    extra_payload={
        "location": "Beijing"
    }
)
```

#### Common EventType Values

| Event Type | Description |
|------------|-------------|
| `answerDelta` | Incremental answer content |
| `reasoningLogDelta` | Reasoning or intermediate log output |
| `appStreamResponse` | Marks stream completion |
| `taskCreate` | Task creation related event |
| `taskStop` | Task termination related event |

---

## 🔌 Plugin System

Plugins are the foundation of By-Framework's extensibility. You can register tools, prompt templates, etc. through plugins.

### Plugin Directory Structure

```
my_plugins/
├── weather_plugin.py
├── calculator_plugin.py
└── custom_hooks.py
```

### Writing a Plugin

Create `my_cool_plugin.py`:

```python
from by_framework import AgentConfig, AgentContext, Plugin, PluginBuildContext, PluginManifest
from typing import Any

class WeatherPlugin(Plugin):
    def __init__(self):
        super().__init__(PluginManifest(
            plugin_id="weather_plugin",
            version="1.0.0",
        ))

    async def register_agent_configs(self, build_context: PluginBuildContext) -> list[AgentConfig]:
        # Plugin registers capabilities by returning a list of AgentConfig
        config = AgentConfig(
            agent_id="weather_assistant",
            tools={
                "get_current_weather": self._get_weather,
                "get_forecast": self._get_forecast
            },
            prompts={
                "system_prompt": "You are a weather assistant..."
            }
        )
        return [config]

    async def _get_weather(self, city: str) -> dict[str, Any]:
        """Get current weather"""
        # In real projects, this would call a real weather API
        return {
            "city": city,
            "temperature": 25,
            "condition": "Sunny",
            "humidity": 60
        }

    async def _get_forecast(self, city: str, days: int = 3) -> list[dict]:
        """Get weather forecast"""
        return [
            {"day": 1, "high": 28, "low": 18, "condition": "Sunny"},
            {"day": 2, "high": 26, "low": 16, "condition": "Cloudy"},
            {"day": 3, "high": 24, "low": 14, "condition": "Overcast"}
        ][:days]

    # Plugin lifecycle hooks
    async def on_task_start(self, context: AgentContext):
        """Called when task starts"""
        print(f"Task {context.message_id} started")

    async def on_task_complete(self, context: AgentContext, result: Any):
        """Called when task completes successfully"""
        print(f"Task {context.message_id} completed")

    async def on_task_error(self, context: AgentContext, error: Exception):
        """Called when task encounters an error"""
        print(f"Task {context.message_id} error: {error}")
```

### Using Plugins

Method 1: Pass via plugin_list parameter

```python
from by_framework import run_worker
from my_cool_plugin import WeatherPlugin

run_worker(
    worker_class=MyAssistant,
    worker_id="worker-01",
    plugin_list=[WeatherPlugin()]
)
```

Method 2: Configure via plugin_configurator callback

```python
from by_framework import run_worker
from my_cool_plugin import WeatherPlugin

def configure_plugins(registry):
    registry.register_bundle(WeatherPlugin())

run_worker(
    worker_class=MyAssistant,
    worker_id="worker-01",
    plugin_configurator=configure_plugins
)
```

Method 3: Load plugin modules from a directory

```python
run_worker(
    worker_class=MyAssistant,
    plugin_dir="./my_plugins"  # Scans .py files in this directory once at startup
)
```

---

## 📡 Sending Tasks

### Using ByaiGatewayClient

`ByaiGatewayClient` is a wrapper around `GatewayClient` that uses shared Byai codec for message serialization by default, supporting higher-level message protocols.

```python
import asyncio

from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis


async def main():
    redis = init_redis(host="localhost", port=6379)
    registry = WorkerRegistry(redis_client=redis)
    client = ByaiGatewayClient(redis_client=redis, registry=registry)

    response = await client.send_message(
        target_agent_type="weather_agent",
        session_id="session_123",
        user_code="user_123",
        content="Query today's weather in Beijing",
    )

    if response.success:
        print(f"Task sent, message ID: {response.message_id}")
    else:
        print(f"Send failed: {response.error}")

    await close_redis()


asyncio.run(main())
```

### Sending Path Explanation

`GatewayClient.send_message(...)` has two modes:

- Default agent type mode: Writes to agent type stream based on `target_agent_type`, and verifies if an online worker exists when `require_online_worker=True`.
- Direct worker mode: When `target_worker_id` is provided, writes directly to worker stream, suitable for debug or direct control.

This means:

- `response.target_worker_id` may be empty in agent type mode, because the actual worker is only determined when a consumer in that agent type reads the message.
- If canceling a task that has already started executing, the execution registry will fill in the `worker_id` when the worker truly starts processing.

---

## 🧪 Examples

### Example 1: Basic Streaming Output

```python
class StreamingAgent(GatewayWorker):
    def get_agent_types(self):
        return ["streaming_demo"]

    async def process_command(self, command, context: AgentContext):
        text = "This is a sample text for streaming output."

        for char in text:
            await context.emit_chunk(char)
            await asyncio.sleep(0.05)

        return {"status": "done"}
```

### Example 2: Registering Plugin Capabilities

Tools, prompts, and skills are registered through the plugin mechanism and exposed through `AgentConfig`.

```python
from by_framework import AgentConfig, GatewayWorker, Plugin, PluginBuildContext, PluginManifest


class CalculatorPlugin(Plugin):
    def __init__(self):
        super().__init__(PluginManifest(plugin_id="calculator"))

    async def register_agent_configs(
        self, build_context: PluginBuildContext
    ) -> list[AgentConfig]:
        return [
            AgentConfig(
                agent_id="tool_demo",
                tools={"calculate": self.calculate},
            )
        ]

    async def calculate(self, a: float, b: float, op: str) -> float:
        if op == "+":
            return a + b
        if op == "-":
            return a - b
        if op == "*":
            return a * b
        if op == "/":
            return a / b if b != 0 else 0
        return 0


class ToolAgent(GatewayWorker):
    def get_agent_types(self):
        return ["tool_demo"]

    async def process_command(self, command, context: AgentContext):
        config = context.get_agent_config("tool_demo")
        await context.emit_chunk(
            f"Registered tools: {list(config.tools.keys()) if config else []}"
        )
        return {"status": "success"}
```

## 🧩 Advanced Capabilities

### User-in-the-Loop Flows

Workers can suspend execution and wait for user input through `context.ask_user(...)`. The follow-up reply is delivered back to the worker as a `ResumeCommand`.

```python
from by_framework import AgentContext, AskUserEvent, GatewayWorker, ResumeCommand


class ApprovalAgent(GatewayWorker):
    def get_agent_types(self):
        return ["approval_agent"]

    async def process_command(self, command, context: AgentContext):
        if isinstance(command, ResumeCommand):
            await context.emit_chunk(f"User replied: {command.content}")
            return {"status": "completed"}

        return await context.ask_user(
            AskUserEvent(prompt="Please confirm the deployment window.")
        )
```

### Scatter-Gather Dispatch

`dispatch_group(...)` can enqueue multiple subtasks under one task group, and `collect_group_results(...)` can gather the callback payloads when they finish.

```python
tasks = [
    {"target_agent_type": "researcher", "content": "Collect references"},
    {"target_agent_type": "writer", "content": "Draft the summary"},
]

group = await context.dispatch_group(tasks, wait_for_reply=True)
results = await context.collect_group_results(group["task_group_id"])
```

### Byai Typed Worker Layer

If your business payloads use BaiYing message objects, `ByaiWorker` and `ByaiAgentContext` provide typed content decoding/encoding on top of the generic worker runtime.

### Service Discovery Utilities

The repository also ships Redis-backed service discovery and a discovery-aware HTTP client:

- `ServiceRegistry` for service registration and heartbeat
- `DiscoveryClient` for cached service lookup and load balancing
- `DiscoveryHttpClient` for node-switching HTTP retries on discovered instances

These utilities live in [src/by_framework/core/discovery.py](src/by_framework/core/discovery.py) and [src/by_framework/util/discovery_http_client.py](src/by_framework/util/discovery_http_client.py).

---

## 🛠️ Configuration Reference

### run_worker Function Parameters

`run_worker` function supports rich configuration options:

| Parameter | Type | Description | Default |
| :--- | :--- | :--- | :--- |
| `worker_class` | `Type[GatewayWorker]` | **Required**. Business Worker class. | - |
| `worker_id` | `str` | Unique identifier for Worker instance. | `"worker-1"` |
| `redis_host` | `str` | Redis server address. | `"localhost"` |
| `redis_port` | `int` | Redis port. | `6379` |
| `redis_db` | `int` | Redis database number. | `0` |
| `redis_password` | `str` | Redis password (optional). | `None` |
| `redis_username` | `str` | Redis username (optional). | `None` |
| `workspace_dir` | `str` | Local working directory for task execution. | `"/tmp/gateway-workspace"` |
| `consumer_group` | `str` | Redis consumer group name. | `"agent_engines"` |
| `max_concurrency` | `int` | Maximum concurrent tasks per Worker. | `50` |
| `fetch_count` | `int` | Number of messages to batch fetch from Redis each time. | `10` |
| `redis_max_connections` | `int` | Maximum Redis connections. | `max_concurrency + 10` |
| `plugin_list` | `List[Plugin]` | Explicitly passed plugin list. | `None` |
| `plugin_configurator` | `Callable` | Plugin configuration callback function. | `None` |
| `plugin_hook_timeout_seconds` | `float` | Default timeout for plugin hooks. | `None` |
| `plugin_log_hook_stats_on_shutdown` | `bool` | Whether to log plugin hook stats on shutdown. | `True` |
| `plugin_dir` | `str` | Directory scanned once at startup for `.py` plugin modules. | `None` |

### Environment Variables

| Environment Variable | Description | Default |
|---------|------|-------|
| `BYAI_WORKER_CONCURRENCY` | Maximum concurrency | `50` |
| `BYAI_WORKER_FETCH_COUNT` | Batch fetch count | `10` |
| `BYAI_REDIS_MAX_CONNECTIONS` | Redis max connections | `max_concurrency + 10` |

---

## 📚 API Reference

### GatewayWorker

```python
class GatewayWorker:
    def get_agent_types(self) -> List[str]:
        """Return list of Agent types this Worker can handle"""
        pass

    async def process_command(self, command, context: AgentContext) -> Any:
        """Process command and return result"""
        pass
```

### AgentContext

```python
class AgentContext:
    session_id: str
    trace_id: str
    current_agent_id: str
    message_id: str
    parent_message_id: str

    async def emit_chunk(self, event: Union[StreamChunkEvent, str], event_type: Optional[str] = None):
        """Send text chunk or streaming event"""

    async def emit_state(self, event: Union[StateChangeEvent, str], event_type: Optional[str] = None):
        """Send state update"""

    async def emit_artifact(self, event: Union[ArtifactEvent, str], event_type: Optional[str] = None):
        """Send artifact/attachment"""

    async def ask_user(self, event: Union[AskUserEvent, str]) -> dict:
        """Send waiting for input request to user"""

    async def call_agent(self, target_agent_type: str, content: object, **kwargs) -> dict:
        """Call other Agent"""

    async def dispatch_group(self, tasks: list[dict], **kwargs) -> dict:
        """Dispatch task group"""

    async def get_active_workers(self) -> Dict[str, Any]:
        """Get all active workers in cluster"""
```

### GatewayClient / ByaiGatewayClient

```python
class GatewayClient:
    async def send_message(
        self,
        target_agent_type: str,
        session_id: str,
        content: Any,
        user_code: str = "",
        action_type: str = "ASK_AGENT",
        metadata: Optional[dict] = None,
        target_worker_id: Optional[str] = None,
        require_online_worker: bool = True,
    ) -> SendMessageResponse:
        """Send message, return response object"""

    async def cancel_task(self, message_id: str, session_id: str, reason: str = "") -> CancelTaskResponse:
        """Cancel specified task"""
```

## 🚀 Deployment Guide

### Single Machine Deployment

1. **Prepare Environment**

```bash
# Install dependencies
cd by-framework-python
uv sync
```

2. **Start Redis**

```bash
docker run -d --name gateway-redis \
  -p 6379:6379 \
  --restart unless-stopped \
  redis:7-alpine
```

3. **Start Worker**

```bash
uv run python -m by_framework \
  --worker-class my_agent.MyAgent \
  --worker-id worker-01 \
  --redis-host localhost
```

### Multi-Worker Deployment

To scale horizontally, run multiple worker processes with different `worker_id` values while sharing the same Redis instance and `target_agent_type` streams.

### Production Environment Recommendations

1. **Use Connection Pool**

```python
run_worker(
    worker_class=MyAgent,
    redis_max_connections=50
)
```

2. **Configure Monitoring**

```python
import logging

from by_framework.common.logger import setup_logging

setup_logging(level=logging.INFO, use_json=True)
```

### FAQ

**Q: How to ensure tasks are not lost?**

A: Redis Streams provides persistence mechanism. Workers use `XACK` to acknowledge message processing completion. Unacknowledged messages will be redelivered.

**Q: How to implement Worker load balancing?**

A: Multiple Workers connect to the same Redis Stream, and Redis automatically performs load distribution among consumers in the consumer group.

---

## 🗺️ Roadmap

- [ ] **Observability Dashboard**: Integrated UI for monitoring worker health and task streams.
- [ ] **Advanced Sandbox**: WASM-based execution environment for enhanced isolation.
- [ ] **Long-term Memory**: Native support for vector-database backed session memory.
- [ ] **Native LangGraph Integration**: Enhanced adapter for complex stateful multi-agent flows.

## 🤝 Contributing

Issues and Pull Requests are welcome! Please check our [CONTRIBUTING.md](CONTRIBUTING.md) for details.

---

## 📄 License

This project is licensed under Apache 2.0 License - see [LICENSE](LICENSE) file for details.

---

Maintained by **byai team**.

Questions or suggestions? Feel free to contact us!
