Metadata-Version: 2.4
Name: fastapi-eventbus
Version: 0.1.0
Summary: High-availability event-driven extension for FastAPI
Project-URL: Homepage, https://github.com/xjaqil/fastapi-eventbus
Project-URL: Documentation, https://github.com/xjaqil/fastapi-eventbus#readme
Project-URL: Repository, https://github.com/xjaqil/fastapi-eventbus
Project-URL: Issues, https://github.com/xjaqil/fastapi-eventbus/issues
Author: xjaqil
License-Expression: MIT
License-File: LICENSE
Keywords: async,event-bus,event-driven,fastapi,pubsub
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: FastAPI
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: fastapi>=0.100.0
Requires-Dist: pydantic-settings>=2.0
Requires-Dist: pydantic>=2.0
Provides-Extra: all
Requires-Dist: aiokafka>=0.9; extra == 'all'
Requires-Dist: redis[hiredis]>=5.0; extra == 'all'
Provides-Extra: dev
Requires-Dist: httpx>=0.27; extra == 'dev'
Requires-Dist: mypy>=1.10; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest-cov>=5.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Provides-Extra: kafka
Requires-Dist: aiokafka>=0.9; extra == 'kafka'
Provides-Extra: redis
Requires-Dist: redis[hiredis]>=5.0; extra == 'redis'
Description-Content-Type: text/markdown

# fastapi-eventbus

高可用事件驱动 pub/sub 扩展，专为 FastAPI 设计。

[![Python](https://img.shields.io/badge/python-3.10%2B-blue)](https://python.org)
[![License](https://img.shields.io/badge/license-MIT-green)](LICENSE)
[![Status](https://img.shields.io/badge/status-alpha-orange)]()

## 特性

- 可插拔后端：内存同步、Redis、Kafka
- 装饰器注册事件处理器（`@on_event`）
- FastAPI 依赖注入集成
- 生命周期感知的启动/关闭
- 指数退避重试 + 死信队列（DLQ）
- 请求级 Correlation ID 中间件
- 后端健康检查
- 可插拔 Metrics 采集（支持对接 Prometheus/StatsD/Datadog）
- 可插拔 Tracing（支持对接 OpenTelemetry/Jaeger）
- Kafka Consumer Rebalance 安全处理
- 完整类型标注（PEP 561）

## 安装

```bash
pip install fastapi-eventbus

# Redis 支持
pip install fastapi-eventbus[redis]

# Kafka 支持
pip install fastapi-eventbus[kafka]

# 全部安装
pip install fastapi-eventbus[all]

# 开发环境
pip install -e ".[all,dev]"
```

## 快速开始

```python
from fastapi import FastAPI, Depends
from fastapi_eventbus import (
    BaseEvent, EventBus, EventBusMiddleware,
    create_lifespan, get_event_bus, on_event,
)


# 1. 定义事件
class UserCreated(BaseEvent):
    topic: str = "user.created"
    user_id: int
    username: str


# 2. 注册处理器
@on_event("user.created")
async def handle_user_created(event: BaseEvent) -> None:
    print(f"新用户: {event}")


# 3. 创建应用
bus = EventBus()
app = FastAPI(lifespan=create_lifespan(bus))
app.add_middleware(EventBusMiddleware)


# 4. 发布事件
@app.post("/users")
async def create_user(bus: EventBus = Depends(get_event_bus)):
    event = UserCreated(user_id=1, username="alice")
    await bus.publish(event)
    return {"status": "ok"}
```

## 配置

通过环境变量配置（前缀 `EVENTBUS_`）：

| 变量 | 默认值 | 说明 |
|---|---|---|
| `EVENTBUS_BACKEND` | `sync` | 后端类型：`sync` / `redis` / `kafka` |
| `EVENTBUS_MAX_RETRIES` | `3` | 最大重试次数 |
| `EVENTBUS_RETRY_DELAY` | `1.0` | 初始重试延迟（秒） |
| `EVENTBUS_RETRY_BACKOFF_FACTOR` | `2.0` | 退避倍数 |
| `EVENTBUS_RETRY_MAX_DELAY` | `60.0` | 最大重试延迟（秒） |
| `EVENTBUS_DLQ_ENABLED` | `false` | 启用死信队列 |
| `EVENTBUS_DLQ_TOPIC_SUFFIX` | `.dlq` | DLQ topic 后缀 |
| `EVENTBUS_REDIS_URL` | `redis://localhost:6379/0` | Redis 连接地址 |
| `EVENTBUS_REDIS_MAX_CONNECTIONS` | `10` | Redis 最大连接数 |
| `EVENTBUS_KAFKA_BOOTSTRAP_SERVERS` | `localhost:9092` | Kafka broker 地址 |
| `EVENTBUS_KAFKA_GROUP_ID` | `fastapi-eventbus` | Kafka 消费组 ID |

也可以通过代码直接配置：

```python
from fastapi_eventbus.config import EventBusSettings

settings = EventBusSettings(
    backend="redis",
    redis_url="redis://my-redis:6379/1",
    max_retries=5,
    dlq_enabled=True,
)
bus = EventBus(settings=settings)
```

## 后端

### 内存同步后端（默认）

零依赖，适合开发和测试：

```python
bus = EventBus()  # 默认使用 sync 后端
```

### Redis 后端

```bash
pip install fastapi-eventbus[redis]
```

```python
# 通过环境变量
# EVENTBUS_BACKEND=redis
# EVENTBUS_REDIS_URL=redis://localhost:6379/0

# 或代码配置
from fastapi_eventbus.backends.redis import RedisBackend

backend = RedisBackend(url="redis://localhost:6379/0")
bus = EventBus(backend=backend)
```

### Kafka 后端

```bash
pip install fastapi-eventbus[kafka]
```

```python
from fastapi_eventbus.backends.kafka import KafkaBackend

backend = KafkaBackend(
    bootstrap_servers="localhost:9092",
    group_id="my-service",
    auto_offset_reset="earliest",
    session_timeout_ms=30000,
    enable_auto_commit=False,  # 推荐：手动提交 offset
)
bus = EventBus(backend=backend)
```

Kafka 后端特性：
- Rebalance 安全：rebalance 期间自动暂停事件处理，完成后恢复
- 手动 offset 提交：每条消息处理成功后才提交，保证 at-least-once 语义
- 异常自动重连：consumer 异常时 5 秒退避后重连
- 批量发布：`publish_many()` 使用 batch send + flush

## 生命周期管理

### 简单模式

```python
bus = EventBus()
app = FastAPI(lifespan=create_lifespan(bus))
```

### 组合模式

与你自己的 lifespan 逻辑组合：

```python
from contextlib import asynccontextmanager
from fastapi_eventbus import EventBus, eventbus_lifespan

bus = EventBus()

@asynccontextmanager
async def lifespan(app):
    # 你的启动逻辑
    async with eventbus_lifespan(bus, app):
        yield
    # 你的关闭逻辑

app = FastAPI(lifespan=lifespan)
```

## 事件处理器

### 装饰器注册

```python
from fastapi_eventbus import on_event, BaseEvent

@on_event("order.created")
async def handle_order(event: BaseEvent) -> None:
    print(f"订单创建: {event.event_id}")

# 同步处理器也支持（会自动在线程池中执行）
@on_event("order.created")
def sync_handler(event: BaseEvent) -> None:
    print("同步处理")
```

### 运行时动态注册

```python
async def my_handler(event: BaseEvent) -> None:
    pass

await bus.registry.register("some.topic", my_handler)
await bus.subscribe("some.topic")
```

## 重试与死信队列

```python
from fastapi_eventbus import EventBus, RetryPolicy

policy = RetryPolicy(
    max_retries=5,
    delay=0.5,
    backoff_factor=2.0,
    max_delay=30.0,
)

bus = EventBus(
    retry_policy=policy,
    settings=EventBusSettings(dlq_enabled=True),
)

# 查看 DLQ 中的失败事件
failed = bus.dlq.peek(limit=20)
for f in failed:
    print(f"事件 {f.event.event_id} 失败 {f.attempts} 次: {f.error}")
```

## Metrics 采集

内置可插拔的 metrics 采集协议，零开销默认关闭。

### 内存采集器（测试/调试）

```python
from fastapi_eventbus import EventBus, InMemoryMetricsCollector

metrics = InMemoryMetricsCollector()
bus = EventBus(metrics=metrics)

# 使用后查看指标
print(metrics.snapshot())
# {'published': {'user.created': 10}, 'dispatched': {'user.created': 10}, ...}
```

### 自定义采集器（对接 Prometheus 等）

实现 `MetricsCollector` 协议即可：

```python
from fastapi_eventbus import MetricsCollector

class PrometheusCollector:
    def __init__(self):
        from prometheus_client import Counter, Histogram
        self.published = Counter("eventbus_published_total", "Events published", ["topic"])
        self.dispatched = Counter("eventbus_dispatched_total", "Events dispatched", ["topic"])
        self.errors = Counter("eventbus_errors_total", "Dispatch errors", ["topic"])
        self.retries = Counter("eventbus_retries_total", "Retries", ["topic"])
        self.dlq = Counter("eventbus_dlq_total", "DLQ events", ["topic"])
        self.pub_duration = Histogram("eventbus_publish_seconds", "Publish duration", ["topic"])
        self.disp_duration = Histogram("eventbus_dispatch_seconds", "Dispatch duration", ["topic"])

    def inc_published(self, topic: str) -> None:
        self.published.labels(topic=topic).inc()

    def inc_dispatched(self, topic: str) -> None:
        self.dispatched.labels(topic=topic).inc()

    def inc_dispatch_error(self, topic: str) -> None:
        self.errors.labels(topic=topic).inc()

    def inc_retry(self, topic: str, attempt: int) -> None:
        self.retries.labels(topic=topic).inc()

    def inc_dlq(self, topic: str) -> None:
        self.dlq.labels(topic=topic).inc()

    def observe_publish_duration(self, topic: str, duration: float) -> None:
        self.pub_duration.labels(topic=topic).observe(duration)

    def observe_dispatch_duration(self, topic: str, duration: float) -> None:
        self.disp_duration.labels(topic=topic).observe(duration)

bus = EventBus(metrics=PrometheusCollector())
```

### 采集的指标

| 指标 | 说明 |
|---|---|
| `inc_published` | 事件发布计数（按 topic） |
| `inc_dispatched` | 事件成功分发计数 |
| `inc_dispatch_error` | 分发错误计数 |
| `inc_retry` | 重试计数（含 attempt 序号） |
| `inc_dlq` | 进入死信队列计数 |
| `observe_publish_duration` | 发布耗时 |
| `observe_dispatch_duration` | 分发耗时 |

## Tracing 追踪

内置轻量级 span 追踪，可对接 OpenTelemetry/Jaeger/Zipkin。

### LoggingTracer（开发调试）

```python
from fastapi_eventbus import EventBus, LoggingTracer

tracer = LoggingTracer()
bus = EventBus(tracer=tracer)

# 每次 publish/dispatch 都会自动创建 span 并输出日志：
# SPAN START publish [a1b2c3d4] topic=user.created event=xxx
# SPAN END publish [a1b2c3d4] status=ok duration=1.23ms

# 查看所有 span
for span in tracer.spans:
    print(span.to_dict())
```

### 自定义 Tracer（对接 OpenTelemetry）

```python
from fastapi_eventbus import EventTracer, EventSpan
from opentelemetry import trace

otel_tracer = trace.get_tracer("fastapi-eventbus")

class OTelTracer:
    def start_span(self, operation, topic="", event_id="", parent_span=None):
        otel_span = otel_tracer.start_span(
            operation,
            attributes={"topic": topic, "event_id": event_id},
        )
        span = EventSpan(operation=operation, topic=topic, event_id=event_id)
        span.attributes["otel_span"] = str(id(otel_span))
        self._otel_spans = getattr(self, "_otel_spans", {})
        self._otel_spans[span.span_id] = otel_span
        return span

    def finish_span(self, span):
        otel_span = getattr(self, "_otel_spans", {}).pop(span.span_id, None)
        if otel_span:
            if span.status.value == "error":
                otel_span.set_status(trace.StatusCode.ERROR, span.error_message)
            otel_span.end()

bus = EventBus(tracer=OTelTracer())
```

### Span 数据结构

每个 span 包含：

```python
{
    "span_id": "a1b2c3d4e5f6g7h8",
    "trace_id": "correlation-id-from-middleware",
    "parent_span_id": "",
    "operation": "publish",       # publish | dispatch
    "topic": "user.created",
    "event_id": "abc123",
    "status": "ok",               # ok | error
    "duration_ms": 1.23,
    "attributes": {},
    "error_message": "",
}
```

## 中间件

```python
from fastapi_eventbus import EventBusMiddleware, get_correlation_id

app.add_middleware(EventBusMiddleware)

@app.get("/test")
async def test():
    cid = get_correlation_id()  # 当前请求的 correlation ID
    return {"correlation_id": cid}
```

- 自动为每个请求生成 Correlation ID（或从 `X-Correlation-ID` 请求头读取）
- 响应头自动附带 `X-Correlation-ID`
- Tracing 的 `trace_id` 自动关联 Correlation ID

## 健康检查

```python
from fastapi import Depends
from fastapi_eventbus import EventBus, get_event_bus

@app.get("/health")
async def health(bus: EventBus = Depends(get_event_bus)):
    results = await bus.health.check()
    healthy = all(r.status.value == "healthy" for r in results)
    return {
        "status": "healthy" if healthy else "unhealthy",
        "backends": [
            {"name": r.backend, "status": r.status.value, "detail": r.detail}
            for r in results
        ],
    }
```

## 开发

```bash
# 克隆并安装
git clone https://github.com/xjaqil/fastapi-eventbus.git
cd fastapi-eventbus
python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[all,dev]"

# 运行测试
pytest

# 代码检查
ruff check .
ruff format .

# 类型检查
mypy src/
```

## 项目结构

```
src/fastapi_eventbus/
├── __init__.py          # 公共 API
├── typing.py            # 类型定义
├── py.typed             # PEP 561 标记
├── core/                # 核心抽象（BaseEvent, Handler, Publisher, Subscriber, 异常）
├── backends/            # 后端实现（sync, redis, kafka）
├── middleware/          # 请求级事件追踪中间件
├── ext/                 # FastAPI 集成（EventBus, 依赖注入, 生命周期）
├── registry/            # Handler 注册表 + @on_event 装饰器
├── retry/               # 重试策略 + 死信队列
├── config/              # Pydantic Settings 配置
├── metrics/             # 可插拔 Metrics 采集
├── tracing/             # 可插拔 Tracing 追踪
└── health/              # 后端健康检查
```

## License

MIT
