Metadata-Version: 2.4
Name: agent-queue
Version: 1.0.2
Summary: A private task queue system for agents using Redis + Celery with gRPC interface (Client Package)
Author-email: NightSailingShip <149816081+NightSailingShip@users.noreply.github.com>
License-Expression: MIT
Project-URL: Homepage, https://github.com/NightSailingShip/queues
Project-URL: Documentation, https://github.com/NightSailingShip/queues#readme
Project-URL: Repository, https://github.com/NightSailingShip/queues
Project-URL: Issues, https://github.com/NightSailingShip/queues/issues
Keywords: task-queue,celery,redis,grpc,agent,distributed,client
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: grpcio>=1.60.0
Requires-Dist: grpcio-tools>=1.60.0
Requires-Dist: protobuf>=4.25.0
Requires-Dist: pyyaml>=6.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.4.0; extra == "dev"
Requires-Dist: build>=1.0.0; extra == "dev"
Requires-Dist: twine>=4.0.0; extra == "dev"
Dynamic: license-file

# Agent Queue 客户端使用指南

Agent Queue 是一个基于 Redis + Celery 的私有化任务队列系统，通过 gRPC 接口提供服务。本文档重点介绍客户端接口的使用方法。

## 目录

- [快速开始](#快速开始)
- [安装](#安装)
- [连接服务](#连接服务)
- [API 接口](#api-接口)
  - [队列管理接口](#队列管理接口)
  - [任务操作接口](#任务操作接口)
  - [任务查询接口](#任务查询接口)
- [数据类型](#数据类型)
- [完整示例](#完整示例)

## 快速开始

```python
import grpc
import json
from client import (
    QueueServiceStub,
    CreateQueueRequest, SubmitTaskRequest, GetTaskRequest,
    UpdateTaskStatusRequest, DATA_PROCESSING, COMPLETED
)

# 1. 连接到 gRPC 服务器
channel = grpc.insecure_channel("localhost:50051")
stub = QueueServiceStub(channel)

# 2. 创建队列
agent_id = "my_agent_001"
create_response = stub.CreateQueue(CreateQueueRequest(agent_id=agent_id))

# 3. 提交任务
submit_response = stub.SubmitTask(SubmitTaskRequest(
    agent_id=agent_id,
    type=DATA_PROCESSING,
    payload=json.dumps({"data": "test"})
))
task_id = submit_response.task_id

# 4. 获取任务
get_response = stub.GetTask(GetTaskRequest(agent_id=agent_id, timeout=5))
task = get_response.task

# 5. 更新任务状态
stub.UpdateTaskStatus(UpdateTaskStatusRequest(
    task_id=task_id,
    agent_id=agent_id,
    status=COMPLETED,
    result=json.dumps({"result": "success"})
))
```

## 安装

### 方式一：从源码安装

```bash
# 克隆仓库
git clone <repository-url>
cd queues

# 安装依赖
pip install -r requirements.txt

# 生成 gRPC 代码
python scripts/generate_grpc.py

# 安装包
pip install -e .
```

### 方式二：从 PyPI 安装（推荐）

```bash
# 使用 pip 安装（会自动安装依赖）
pip install agent-queue

# 或使用 Poetry 安装（会自动安装依赖）
poetry add agent-queue
```

**说明**：
- ✅ 两种方式都会自动安装所有必需的依赖
- ✅ Poetry 完全支持，依赖会自动添加到 `pyproject.toml` 和 `poetry.lock`
- ✅ 依赖包括：`grpcio`, `grpcio-tools`, `protobuf`, `pyyaml`

### 依赖要求

- Python >= 3.12
- grpcio >= 1.60.0
- protobuf >= 4.25.0

## 连接服务

```python
import grpc
from client import QueueServiceStub

# 创建 gRPC 通道
channel = grpc.insecure_channel("localhost:50051")  # 默认端口 50051

# 创建服务客户端
stub = QueueServiceStub(channel)

# 使用环境变量配置（推荐）
import os
grpc_host = os.getenv("GRPC_HOST", "localhost")
grpc_port = int(os.getenv("GRPC_PORT", "50051"))
channel = grpc.insecure_channel(f"{grpc_host}:{grpc_port}")
stub = QueueServiceStub(channel)
```

## API 接口

所有接口都通过 `agent_id` 路由到对应的私有化任务队列。

### 队列管理接口

#### 1. CreateQueue - 创建队列

创建指定 agent 的私有任务队列。

**请求参数：**
- `agent_id` (string): Agent ID

**响应字段：**
- `success` (bool): 是否成功
- `agent_id` (string): Agent ID
- `message` (string): 消息

**使用示例：**

```python
from client import CreateQueueRequest

request = CreateQueueRequest(agent_id="agent_001")
response = stub.CreateQueue(request)

if response.success:
    print(f"队列创建成功: {response.agent_id}")
else:
    print(f"创建失败: {response.message}")
```

#### 2. QueueExists - 检查队列是否存在

检查指定 agent 的队列是否存在。

**请求参数：**
- `agent_id` (string): Agent ID

**响应字段：**
- `exists` (bool): 队列是否存在
- `message` (string): 消息

**使用示例：**

```python
from client import QueueExistsRequest

request = QueueExistsRequest(agent_id="agent_001")
response = stub.QueueExists(request)

if response.exists:
    print("队列存在")
else:
    print("队列不存在")
```

#### 3. GetQueueInfo - 获取队列信息

获取队列的统计信息。

**请求参数：**
- `agent_id` (string): Agent ID

**响应字段：**
- `success` (bool): 是否成功
- `agent_id` (string): Agent ID
- `pending_count` (int32): 待处理任务数
- `processing_count` (int32): 处理中任务数
- `completed_count` (int32): 已完成任务数
- `failed_count` (int32): 失败任务数
- `total_count` (int32): 总任务数
- `message` (string): 消息

**使用示例：**

```python
from client import GetQueueInfoRequest

request = GetQueueInfoRequest(agent_id="agent_001")
response = stub.GetQueueInfo(request)

if response.success:
    print(f"待处理: {response.pending_count}")
    print(f"处理中: {response.processing_count}")
    print(f"已完成: {response.completed_count}")
    print(f"失败: {response.failed_count}")
    print(f"总计: {response.total_count}")
```

#### 4. ClearQueue - 清空队列

清空队列中的任务，可按状态过滤。

**请求参数：**
- `agent_id` (string): Agent ID
- `status` (TaskStatus, 可选): 按状态清空，0 表示清空所有状态

**响应字段：**
- `success` (bool): 是否成功
- `cleared_count` (int32): 清空的任务数量
- `message` (string): 消息

**使用示例：**

```python
from client import ClearQueueRequest, FAILED

# 清空所有任务
request = ClearQueueRequest(agent_id="agent_001", status=0)
response = stub.ClearQueue(request)

# 只清空失败的任务
request = ClearQueueRequest(agent_id="agent_001", status=FAILED)
response = stub.ClearQueue(request)

if response.success:
    print(f"已清空 {response.cleared_count} 个任务")
```

### 任务操作接口

#### 5. SubmitTask - 提交任务

提交单个任务到队列。

**请求参数：**
- `agent_id` (string): Agent ID
- `type` (TaskType): 任务类型（枚举）
- `task_type` (string): 自定义任务类型字符串（当 `type = CUSTOM` 时使用）
- `payload` (string): 任务负载（JSON 字符串）

**响应字段：**
- `success` (bool): 是否成功
- `task_id` (string): 任务ID
- `message` (string): 消息

**使用示例：**

```python
import json
from client import SubmitTaskRequest, DATA_PROCESSING, CUSTOM

# 使用预定义类型
request = SubmitTaskRequest(
    agent_id="agent_001",
    type=DATA_PROCESSING,
    task_type="",  # 预定义类型不需要
    payload=json.dumps({"data": "test", "action": "process"})
)
response = stub.SubmitTask(request)

if response.success:
    print(f"任务已提交，ID: {response.task_id}")

# 使用自定义类型
request = SubmitTaskRequest(
    agent_id="agent_001",
    type=CUSTOM,
    task_type="my_custom_task",
    payload=json.dumps({"custom": "data"})
)
response = stub.SubmitTask(request)
```

#### 6. BatchSubmitTasks - 批量提交任务

批量提交多个任务到队列。

**请求参数：**
- `agent_id` (string): Agent ID
- `tasks` (repeated SubmitTaskItem): 任务列表
  - `type` (TaskType): 任务类型
  - `task_type` (string): 自定义任务类型字符串
  - `payload` (string): 任务负载（JSON 字符串）
  - `task_id` (string, 可选): 指定任务ID（不提供则自动生成）

**响应字段：**
- `success` (bool): 是否成功
- `task_ids` (repeated string): 提交成功的任务ID列表
- `success_count` (int32): 成功数量
- `failed_count` (int32): 失败数量
- `message` (string): 消息

**使用示例：**

```python
from client import BatchSubmitTasksRequest, SubmitTaskItem, DATA_PROCESSING

tasks = [
    SubmitTaskItem(
        type=DATA_PROCESSING,
        payload=json.dumps({"data": "task1"})
    ),
    SubmitTaskItem(
        type=DATA_PROCESSING,
        payload=json.dumps({"data": "task2"})
    )
]

request = BatchSubmitTasksRequest(
    agent_id="agent_001",
    tasks=tasks
)
response = stub.BatchSubmitTasks(request)

if response.success:
    print(f"成功提交 {response.success_count} 个任务")
    print(f"任务ID列表: {response.task_ids}")
```

#### 7. GetTask - 获取任务

从队列中获取一个任务（主动拉取模式）。

**请求参数：**
- `agent_id` (string): Agent ID
- `timeout` (int32): 超时时间（秒），0 表示不等待

**响应字段：**
- `success` (bool): 是否成功
- `task` (Task): 任务对象
- `message` (string): 消息

**使用示例：**

```python
from client import GetTaskRequest

# 不等待，立即返回
request = GetTaskRequest(agent_id="agent_001", timeout=0)
response = stub.GetTask(request)

# 等待最多 10 秒
request = GetTaskRequest(agent_id="agent_001", timeout=10)
response = stub.GetTask(request)

if response.success and response.task.task_id:
    task = response.task
    print(f"任务ID: {task.task_id}")
    print(f"任务类型: {task.task_type}")
    print(f"负载: {task.payload}")
    print(f"状态: {task.status}")
else:
    print("队列中没有任务")
```

#### 8. UpdateTaskStatus - 更新任务状态

更新任务的状态和结果。

**请求参数：**
- `task_id` (string): 任务ID
- `agent_id` (string): Agent ID
- `status` (TaskStatus): 新状态
- `result` (string, 可选): 任务结果（JSON 字符串）
- `error_message` (string, 可选): 错误信息

**响应字段：**
- `success` (bool): 是否成功
- `message` (string): 消息

**使用示例：**

```python
from client import UpdateTaskStatusRequest, COMPLETED, FAILED

# 标记任务为已完成
request = UpdateTaskStatusRequest(
    task_id="task_123",
    agent_id="agent_001",
    status=COMPLETED,
    result=json.dumps({"result": "处理完成", "output": "success"})
)
response = stub.UpdateTaskStatus(request)

# 标记任务为失败
request = UpdateTaskStatusRequest(
    task_id="task_123",
    agent_id="agent_001",
    status=FAILED,
    error_message="处理失败：数据格式错误"
)
response = stub.UpdateTaskStatus(request)
```

#### 9. QueryTask - 查询任务

根据任务ID查询任务信息。

**请求参数：**
- `task_id` (string): 任务ID
- `agent_id` (string): Agent ID

**响应字段：**
- `success` (bool): 是否成功
- `task` (Task): 任务对象
- `message` (string): 消息

**使用示例：**

```python
from client import QueryTaskRequest

request = QueryTaskRequest(
    task_id="task_123",
    agent_id="agent_001"
)
response = stub.QueryTask(request)

if response.success:
    task = response.task
    print(f"任务ID: {task.task_id}")
    print(f"状态: {task.status}")
    print(f"创建时间: {task.created_at}")
    print(f"结果: {task.result}")
```

#### 10. DeleteTask - 删除任务

删除指定任务。

**请求参数：**
- `task_id` (string): 任务ID
- `agent_id` (string): Agent ID

**响应字段：**
- `success` (bool): 是否成功
- `message` (string): 消息

**使用示例：**

```python
from client import DeleteTaskRequest

request = DeleteTaskRequest(
    task_id="task_123",
    agent_id="agent_001"
)
response = stub.DeleteTask(request)

if response.success:
    print("任务已删除")
```

#### 11. BatchDeleteTasks - 批量删除任务

批量删除多个任务。

**请求参数：**
- `agent_id` (string): Agent ID
- `task_ids` (repeated string): 任务ID列表
- `status` (TaskStatus, 可选): 按状态删除（0 表示不限制状态）

**响应字段：**
- `success` (bool): 是否成功
- `deleted_count` (int32): 删除的任务数量
- `message` (string): 消息

**使用示例：**

```python
from client import BatchDeleteTasksRequest

request = BatchDeleteTasksRequest(
    agent_id="agent_001",
    task_ids=["task_1", "task_2", "task_3"]
)
response = stub.BatchDeleteTasks(request)

if response.success:
    print(f"已删除 {response.deleted_count} 个任务")
```

#### 12. CancelTask - 取消任务

取消指定任务。

**请求参数：**
- `task_id` (string): 任务ID
- `agent_id` (string): Agent ID
- `reason` (string, 可选): 取消原因

**响应字段：**
- `success` (bool): 是否成功
- `message` (string): 消息

**使用示例：**

```python
from client import CancelTaskRequest

request = CancelTaskRequest(
    task_id="task_123",
    agent_id="agent_001",
    reason="用户取消"
)
response = stub.CancelTask(request)
```

#### 13. RetryTask - 重试任务

重试失败的任务（创建新任务）。

**请求参数：**
- `task_id` (string): 任务ID
- `agent_id` (string): Agent ID

**响应字段：**
- `success` (bool): 是否成功
- `new_task_id` (string): 新任务ID（如果创建了新任务）
- `message` (string): 消息

**使用示例：**

```python
from client import RetryTaskRequest

request = RetryTaskRequest(
    task_id="task_123",
    agent_id="agent_001"
)
response = stub.RetryTask(request)

if response.success:
    print(f"重试成功，新任务ID: {response.new_task_id}")
```

### 任务查询接口

#### 14. ListTasks - 列出任务

列出队列中的所有任务，支持分页和状态过滤。

**请求参数：**
- `agent_id` (string): Agent ID
- `status` (TaskStatus, 可选): 任务状态，0 表示所有状态
- `limit` (int32): 限制数量
- `offset` (int32): 偏移量

**响应字段：**
- `success` (bool): 是否成功
- `tasks` (repeated Task): 任务列表
- `total` (int32): 总数
- `message` (string): 消息

**使用示例：**

```python
from client import ListTasksRequest, PENDING

# 列出所有任务
request = ListTasksRequest(
    agent_id="agent_001",
    status=0,  # 所有状态
    limit=10,
    offset=0
)
response = stub.ListTasks(request)

# 只列出待处理的任务
request = ListTasksRequest(
    agent_id="agent_001",
    status=PENDING,
    limit=10,
    offset=0
)
response = stub.ListTasks(request)

if response.success:
    print(f"找到 {response.total} 个任务")
    for task in response.tasks:
        print(f"  - {task.task_id}: {task.status}")
```

#### 15. GetTaskStats - 获取任务统计信息

获取任务的统计信息。

**请求参数：**
- `agent_id` (string): Agent ID
- `status` (TaskStatus, 可选): 按状态统计（0 表示所有状态）

**响应字段：**
- `success` (bool): 是否成功
- `total_count` (int32): 总任务数
- `pending_count` (int32): 待处理任务数
- `processing_count` (int32): 处理中任务数
- `completed_count` (int32): 已完成任务数
- `failed_count` (int32): 失败任务数
- `message` (string): 消息

**使用示例：**

```python
from client import GetTaskStatsRequest

request = GetTaskStatsRequest(agent_id="agent_001")
response = stub.GetTaskStats(request)

if response.success:
    print(f"总任务数: {response.total_count}")
    print(f"待处理: {response.pending_count}")
    print(f"处理中: {response.processing_count}")
    print(f"已完成: {response.completed_count}")
    print(f"失败: {response.failed_count}")
```

## 数据类型

### TaskStatus - 任务状态枚举

```python
from client import TaskStatus, PENDING, PROCESSING, COMPLETED, FAILED

# 枚举值
PENDING = 0      # 待处理
PROCESSING = 1   # 处理中
COMPLETED = 2    # 已完成
FAILED = 3       # 失败
```

### TaskType - 任务类型枚举

```python
from client import (
    TaskType,
    UNKNOWN,           # 0 - 未知类型
    DATA_PROCESSING,   # 1 - 数据处理
    IMAGE_PROCESSING,  # 2 - 图像处理
    TEXT_ANALYSIS,     # 3 - 文本分析
    MODEL_INFERENCE,   # 4 - 模型推理
    DATA_EXTRACTION,   # 5 - 数据提取
    FILE_UPLOAD,       # 6 - 文件上传
    FILE_DOWNLOAD,     # 7 - 文件下载
    API_CALL,          # 8 - API 调用
    DATABASE_QUERY,    # 9 - 数据库查询
    CUSTOM             # 99 - 自定义类型
)
```

### Task - 任务对象

```python
from client import Task

task = Task(
    task_id="task_123",           # 任务ID
    agent_id="agent_001",         # Agent ID
    type=DATA_PROCESSING,         # 任务类型（枚举）
    task_type="",                 # 自定义任务类型字符串
    payload='{"key": "value"}',   # 任务负载（JSON字符串）
    status=PENDING,               # 任务状态
    created_at=1234567890,        # 创建时间（Unix时间戳）
    updated_at=1234567890,        # 更新时间（Unix时间戳）
    result='{"result": "ok"}',    # 任务结果（JSON字符串）
    error_message=""              # 错误信息
)
```

## 完整示例

查看示例代码获取完整的使用示例：
- `examples/client_grpc_example.py` - 所有接口的完整使用示例
- `examples/advanced_usage_example.py` - 高级功能示例（错误处理、幂等性、任务处理循环、监控等）

### 基本工作流程

```python
import grpc
import json
import os
from client import (
    QueueServiceStub,
    CreateQueueRequest, GetQueueInfoRequest,
    SubmitTaskRequest, GetTaskRequest,
    UpdateTaskStatusRequest, QueryTaskRequest,
    ListTasksRequest,
    DATA_PROCESSING, COMPLETED, PENDING
)

# 1. 连接服务
grpc_host = os.getenv("GRPC_HOST", "localhost")
grpc_port = int(os.getenv("GRPC_PORT", "50051"))
channel = grpc.insecure_channel(f"{grpc_host}:{grpc_port}")
stub = QueueServiceStub(channel)

agent_id = "my_agent_001"

# 2. 创建队列
create_response = stub.CreateQueue(CreateQueueRequest(agent_id=agent_id))
if not create_response.success:
    print(f"创建队列失败: {create_response.message}")
    exit(1)

# 3. 提交任务
submit_response = stub.SubmitTask(SubmitTaskRequest(
    agent_id=agent_id,
    type=DATA_PROCESSING,
    payload=json.dumps({"data": "test", "action": "process"})
))
task_id = submit_response.task_id
print(f"任务已提交: {task_id}")

# 4. 获取任务
get_response = stub.GetTask(GetTaskRequest(agent_id=agent_id, timeout=5))
if get_response.success and get_response.task.task_id:
    task = get_response.task
    print(f"获取到任务: {task.task_id}")
    
    # 5. 处理任务...
    # process_task(task.payload)
    
    # 6. 更新任务状态
    stub.UpdateTaskStatus(UpdateTaskStatusRequest(
        task_id=task.task_id,
        agent_id=agent_id,
        status=COMPLETED,
        result=json.dumps({"result": "处理完成"})
    ))

# 7. 查询任务
query_response = stub.QueryTask(QueryTaskRequest(
    task_id=task_id,
    agent_id=agent_id
))
if query_response.success:
    print(f"任务状态: {query_response.task.status}")

# 8. 列出所有任务
list_response = stub.ListTasks(ListTasksRequest(
    agent_id=agent_id,
    status=0,
    limit=10,
    offset=0
))
if list_response.success:
    print(f"队列中共有 {list_response.total} 个任务")
```

### 批量处理示例

```python
from client import BatchSubmitTasksRequest, SubmitTaskItem, DATA_PROCESSING

# 批量提交任务
tasks = [
    SubmitTaskItem(
        type=DATA_PROCESSING,
        payload=json.dumps({"id": i, "data": f"task_{i}"})
    )
    for i in range(10)
]

batch_response = stub.BatchSubmitTasks(BatchSubmitTasksRequest(
    agent_id=agent_id,
    tasks=tasks
))

print(f"成功提交 {batch_response.success_count} 个任务")
print(f"任务ID: {batch_response.task_ids}")
```

### 错误处理示例

```python
try:
    response = stub.SubmitTask(SubmitTaskRequest(
        agent_id="invalid_agent",
        type=DATA_PROCESSING,
        payload=json.dumps({"data": "test"})
    ))
    if not response.success:
        print(f"提交失败: {response.message}")
except grpc.RpcError as e:
    print(f"gRPC 错误: {e.code()} - {e.details()}")
except Exception as e:
    print(f"其他错误: {e}")
```

## 注意事项

1. **agent_id 路由**：所有接口都通过 `agent_id` 路由到对应的私有化任务队列，确保每个 agent 使用唯一的 ID。

2. **任务负载格式**：`payload` 和 `result` 字段必须是有效的 JSON 字符串。

3. **超时设置**：`GetTask` 接口的 `timeout` 参数设置为 0 表示不等待，立即返回；大于 0 表示等待指定秒数。

4. **状态过滤**：在 `ListTasks`、`ClearQueue` 等接口中，`status` 参数设置为 0 表示不限制状态（所有状态）。

5. **自定义任务类型**：使用 `CUSTOM` 类型时，必须在 `task_type` 字段中指定自定义类型字符串。

6. **连接管理**：建议复用 gRPC 通道和 stub，避免频繁创建连接。

## 更多信息

- 服务端文档：查看 `README_SERVER.md`
- 示例代码：
  - `examples/client_grpc_example.py` - 所有接口的完整使用示例
  - `examples/advanced_usage_example.py` - 高级功能示例
- Proto 定义：查看 `client/queue_service.proto`

### IDE 跳转到源代码定义

如果 IDE 无法跳转到 `PrivateAgentTasksQueue` 的定义，可以：

**方法1：直接从子模块导入（推荐）**
```python
# 直接从子模块导入，IDE 可以正确跳转
from client.queues.agent_queue import PrivateAgentTasksQueue
```

**方法2：使用 IDE 的跳转功能**
- VS Code: 按住 `Ctrl` 点击类名，或按 `F12`
- PyCharm: 按住 `Ctrl` 点击类名，或按 `Ctrl+B`

**方法3：手动查找文件**
- VS Code: 按 `Ctrl+P`，输入 `agent_queue.py`
- PyCharm: 按 `Ctrl+Shift+N`，输入 `agent_queue.py`

更多信息请参考：[IDE 跳转说明](docs/IDE_JUMP_TO_DEFINITION.md)

### 查看 Proto 接口定义文件

客户端包中包含了完整的 proto 接口定义文件，可以通过以下方式访问：

**方式1：使用工具函数（推荐）**
```python
from client import get_proto_file_path, get_proto_content

# 获取文件路径
proto_path = get_proto_file_path()
print(f"Proto 文件位置: {proto_path}")

# 直接获取文件内容
proto_content = get_proto_content()
print(proto_content)
```

**方式2：在 IDE 中查找**
- VS Code: 按 `Ctrl+P`，输入 `queue_service.proto`
- PyCharm: 按 `Ctrl+Shift+N`，输入 `queue_service.proto`

**方式3：直接访问文件**
```python
import client
import os

proto_path = os.path.join(os.path.dirname(client.__file__), 'queue_service.proto')
with open(proto_path, 'r') as f:
    print(f.read())
```

更多信息请参考：[Proto 接口文件说明](docs/PROTO_FILES.md)
