Metadata-Version: 2.4
Name: sandai-operator-client
Version: 0.6.0
Summary: Client library for the Sandai Operator SDK
Home-page: https://github.com/world-sim-dev/sandai-data-project
Author: Sandai Team
Author-email: dev@sand.ai
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: celery>=5.2.0
Requires-Dist: redis>=4.0.0
Requires-Dist: kombu>=5.2.0
Requires-Dist: msgpack>=1.0.0
Provides-Extra: tracing
Requires-Dist: opentelemetry-api>=1.0.0; extra == "tracing"
Provides-Extra: dev
Requires-Dist: black>=24.0.0; extra == "dev"
Requires-Dist: flake8>=7.0.0; extra == "dev"
Requires-Dist: isort>=5.13.0; extra == "dev"
Requires-Dist: mypy>=1.10.0; extra == "dev"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# Sandai Operator Client

Lightweight Python client for submitting tasks to a Sandai operator over Celery.

## Installation

```bash
pip install sandai-operator-client
```

For local development:

```bash
pip install -e ".[dev]"
```

## Quick Start

```python
from sandai.operator_client import create_client

client = create_client("video-clipper", "v1")

artifacts, results = client.sync(
    input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
    options={"clip_duration": 10},
)
```

Async usage:

```python
import asyncio

from sandai.operator_client import create_async_client


async def main() -> None:
    async with create_async_client("video-clipper", "v1") as client:
        artifacts, results = await client.sync(
            input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
            options={"clip_duration": 10},
        )
        print(artifacts, results)


asyncio.run(main())
```

## Core API

- Sync client: `create_client(operator_name, operator_version, **kwargs)` returns `OperatorClient`.
- Async client: `create_async_client(operator_name, operator_version, **kwargs)` returns `AsyncOperatorClient`.
- Sync submit: `client.async_call(...)` returns Celery `AsyncResult`.
- Async submit: `await async_client.async_call(...)` returns `AsyncTaskHandle`.
- Sync request/response: `client.sync(...)` returns `(artifacts, results)`.
- Async request/response: `await async_client.sync(...)` returns `(artifacts, results)`.
- Sync description: `client.desc` is a cached property.
- Async description: `await async_client.desc()` is an async cached method.
- Sync legacy batch APIs: `batch_async(...)` and `batch_sync(...)` keep the existing best-effort contract.
- Async legacy-shaped batch APIs: `await async_client.batch_async(...)` and `await async_client.batch_sync(...)` keep the same result shapes in async call sites.
- Structured batch APIs: `batch_submit(...)`, `batch_collect(...)`, and `batch_sync_detailed(...)` are available on both clients.
- Structured batch collection uses a batch-level timeout budget: completed tasks return normal results, unfinished submitted tasks return `TaskTimeoutError`, and result order matches input order.
- Health APIs: `probe()` / `ping()` exist on both clients, with `await` required on the async client.
- `get_queue_info()` returns the queue names used for each priority.

## Task Controls

The client supports the same task-control fields used by the operator runtime:

- `timeout`: converted into an absolute `deadline` before sending the task.
- Blocking request/response APIs default to `3600` seconds when `timeout` is omitted; `probe()` keeps its short `10` second default.
- `cancel_key`: propagated to the operator so tasks can be skipped before compute starts.
- `persistent_output`: stored under `meta["persistent-output"]` for runtimes that preserve uploaded outputs.

## Batch APIs

The package now exposes two batch styles:

- Legacy compatibility APIs: `batch_async(...)` and `batch_sync(...)` keep the previous best-effort semantics.
- Structured APIs: `batch_submit(...)`, `batch_collect(...)`, and `batch_sync_detailed(...)` expose explicit per-task submission/execution status instead of collapsing failures into empty results.
- `batch_collect(...)` and `batch_sync_detailed(...)` treat `timeout` as a total batch budget instead of a per-item wait.
- When the batch deadline expires, unfinished submitted tasks are surfaced as `TaskTimeoutError` entries instead of keeping later tasks blocked behind earlier waits.

Async batch methods follow the same contract, but are awaited:

- `await async_client.batch_submit(...)` returns `AsyncSubmittedTask` objects.
- `await async_client.batch_async(...)` returns `AsyncTaskHandle | None` per task.
- `await async_client.batch_collect(...)` returns `BatchTaskResult` objects.
- `await async_client.batch_sync_detailed(...)` returns `BatchTaskResult` objects.
- `await async_client.batch_sync(...)` returns legacy `(artifacts, results)` tuples.

Prefer the structured APIs for new integrations.

## Async API

`AsyncOperatorClient` is an additive API layer. It does not remove or replace `OperatorClient`.

`AsyncTaskHandle` exposes:

- `task_id` / `id`
- `await handle.get(timeout=...)`
- `await handle.ready()`
- `await handle.state()`
- `await handle.forget()`

Implementation notes:

- Task submission remains compatible with the current Celery-based operator side.
- When `result_backend` is Redis, the async client uses `redis.asyncio` plus a shared polling hub for async result waiting.
- The async client is intended for async application integration and lower blocking overhead on result collection.
- The sync client remains the compatibility/default path for existing callers.

## Health Checks

`ping()` and `probe()` are intentionally different:

- `ping()` returns `True` when task submission succeeds.
- `probe()` returns a structured `HealthCheckResult` with `submit_ok`, `retrieval_ok`, `response_ok`, and optional error details.

Use `probe()` when you need a meaningful health signal.

## Priority Model

Supported priorities are:

- `high`
- `normal`
- `low`
- `very_low`

Queue names follow this pattern:

```text
<priority>.<operator_name>.<operator_version>
```

## Environment Variables

The client reads these settings when explicit connection arguments are not provided:

- `SANDAI_OPERATOR_CELERY_BROKER_URL`
- `SANDAI_OPERATOR_CELERY_RESULT_BACKEND`
- `SANDAI_OPERATOR_CELERY_TASK_SERIALIZER`
- `SANDAI_OPERATOR_CELERY_RESULT_SERIALIZER`
- `SANDAI_OPERATOR_AUTO_FORGET_RESULT`
- `SANDAI_OPERATOR_BROKER_POOL_LIMIT`
- `SANDAI_OPERATOR_BACKEND_POOL_LIMIT`
- `SANDAI_OPERATOR_BACKEND_POLL_CONCURRENCY`
- `SANDAI_OPERATOR_POOL_ACQUIRE_TIMEOUT`
- `SANDAI_OPERATOR_SHARED_TRANSPORT_SCOPE`
- `SANDAI_OPERATOR_CLIENT_DETAILED_DEBUG`

If broker/backend are not configured, the client falls back to local Redis defaults:

```text
redis://localhost:6379/0
```

By default the client now submits Celery task payloads with `msgpack`, keeps Celery results on `json`, and advertises both `json` and `msgpack` in `accept_content` so it can interoperate with operators during rollout.

## Debug Logging

Client debug logs require both of these:

- Construct the client with `debug=True`.
- Configure Python logging to emit `DEBUG` level logs for the client modules.

Recommended logger setup:

```python
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)s [%(threadName)s:%(thread)d] %(name)s %(message)s",
)

for name in [
    "sandai.operator_client.client",
    "sandai.operator_client.transport",
    "sandai.operator_client.async_transport",
    "sandai.operator_client.shared_transport",
]:
    logging.getLogger(name).setLevel(logging.DEBUG)
```

If you also need very verbose transport/pool diagnostics, set:

```bash
export SANDAI_OPERATOR_CLIENT_DETAILED_DEBUG=1
```

With `debug=True` plus `SANDAI_OPERATOR_CLIENT_DETAILED_DEBUG=1`, the client will additionally log:

- thread-aware pool acquire/release/timeout events
- shared broker/backend/app creation and release
- backend pool occupancy snapshots before critical acquires
- batch polling rounds, idle backoff, ready counts, and per-item completion/timeout

## Connection Sharing

The client now uses process-local shared transport resources for broker access and result-backend access.

This sharing is intentionally conservative:

- Default scope is `operator`.
- Broker and result backend are shared independently.
- A resource is shared only when the full sharing key matches.

### Default Behavior

With the default `shared_transport_scope="operator"`:

- Two clients for the same `operator_name` + `operator_version` may share broker/backend resources.
- Two clients for different operators do not share by default, even if they point to the same Redis address.
- If you set `shared_transport_scope="process"`, compatible clients across different operators can share the same resources.

### Sharing Rules

The following table describes when two clients share the same process-local resource.

| Parameter / property | Broker sharing affected | Backend sharing affected | Notes |
| --- | --- | --- | --- |
| `shared_transport_scope` | Yes | Yes | Default is `operator`. `process` allows cross-operator sharing when other fields also match. |
| `operator_name` | Yes, when scope is `operator` | Yes, when scope is `operator` | Different operators do not share by default. |
| `operator_version` | Yes, when scope is `operator` | Yes, when scope is `operator` | Version is part of the default scope identity. |
| `broker_url` | Yes | No | Different broker addresses always use different broker resources. |
| `result_backend` | No | Yes | Different result-backend addresses always use different backend resources. |
| `task_serializer` | Yes | No | Broker-side Celery app compatibility depends on task serializer. |
| `result_serializer` | No | Yes | Backend decode behavior depends on result serializer. |
| `accept_content` | Yes | Yes | Different accepted serializer sets do not share. |
| `broker_pool_limit` | Yes | No | Different broker limits create different broker resource groups. |
| `backend_pool_limit` | No | Yes | Different backend pool limits create different backend resource groups. |
| `backend_poll_concurrency` | No | Yes | Backend operation concurrency participates in backend sharing. |

Practical examples:

| Scenario | Shared? | Why |
| --- | --- | --- |
| Same operator, same Redis addresses, same pool settings | Yes | Full sharing key matches. |
| Same operator, same Redis addresses, different `backend_poll_concurrency` | No for backend | Backend concurrency is part of the backend key. |
| Same operator, same backend, different `broker_pool_limit` | No for broker | Broker pool limit is part of the broker key. |
| Different operators, same Redis addresses, default scope | No | Default scope is `operator`. |
| Different operators, same Redis addresses, `shared_transport_scope="process"`, same settings | Yes | Cross-operator sharing is enabled by scope. |

## Connection Limits And Runtime Behavior

The client has two layers of control:

- Pool size: how many Redis connections the underlying client is allowed to keep.
- Operation concurrency: how many broker/backend operations can actively hit Redis at the same time.

### Key Limits

| Setting | Default | Applies to | Effect |
| --- | --- | --- | --- |
| `broker_pool_limit` | `12` | Broker | Broker-side connection budget for shared broker resources. |
| `backend_pool_limit` | `12` | Result backend | Redis connection-pool budget for shared backend resources. |
| `backend_poll_concurrency` | `6` | Result backend | Maximum concurrent backend polling/forget operations. Effective backend concurrency is `min(backend_pool_limit, backend_poll_concurrency)`. |
| `pool_acquire_timeout` | `10.0` | Broker and backend | Maximum time to wait for a bounded broker/backend slot before raising `ClientPoolTimeoutError`. |

You can pass these directly in code:

```python
from sandai.operator_client import OperatorClient

client = OperatorClient(
    "video-clipper",
    "v1",
    broker_pool_limit=4,
    backend_pool_limit=6,
    backend_poll_concurrency=3,
    pool_acquire_timeout=10.0,
    shared_transport_scope="operator",
)
```

The same settings are available on `AsyncOperatorClient`.

### Current Connection Behavior

Current implementation behavior:

- Broker submission uses short-lived write connections instead of a long-lived producer connection held by each client.
- Result polling uses bounded backend concurrency and releases slots promptly after each poll.
- `sync` and `async` clients both use the same bounded transport model.
- Connection resources are keyed separately for broker and backend, so different Redis addresses are isolated naturally.
- `close()` / `aclose()` release the client's references to shared resources.

### Verified Behavior Under Load

Using the real Redis benchmark in `operator-client/tests/redis_connection_benchmark.py`, the current implementation shows:

- Broker-side operator-client Redis connections return to `0` after scenario completion.
- Backend-side operator-client Redis connections also return to `0` after scenario completion.
- Peak backend operator-client connections stay bounded by the configured backend concurrency budget rather than request count.
- Increasing request count from `40` to `120` did not cause unbounded operator-client connection growth.

Representative benchmark outcome after the current refactor:

| Scenario | Requests | Broker operator clients at settle | Backend operator clients at settle | Notes |
| --- | --- | --- | --- | --- |
| `sync_shared_client_threads` | `120` | `0` | `0` | Shared sync client across threads. |
| `sync_thread_local_clients` | `120` | `0` | `0` | One sync client per worker thread. |
| `async_single_loop` | `120` | `0` | `0` | Async client on one event loop. |
| `async_multi_thread_loops` | `120` | `0` | `0` | Async clients across multiple threads and loops. |

This means the current implementation is designed to keep connection behavior bounded by configuration, not by request volume.

### Pressure Test And Performance Impact

An additional real Redis pressure test was run with a larger load profile:

- `requests=480`
- `sync_threads=64`
- `async_concurrency=192`
- `async_threads=8`
- `async_thread_concurrency=48`
- `operator_concurrency=64`
- `mock_delay_ms=300`
- `request_timeout=3600`

Two configurations were compared:

- Current default settings: `broker_pool_limit=12`, `backend_pool_limit=12`, `backend_poll_concurrency=6`
- Relaxed settings: `broker_pool_limit=32`, `backend_pool_limit=32`, `backend_poll_concurrency=32`

Observed wall-clock times:

| Scenario | Current default settings | Relaxed settings | Observation |
| --- | --- | --- | --- |
| `sync_shared_client_threads` | `13.22s` | N/A | `480/480` succeeded with bounded backend usage. |
| `sync_thread_local_clients` | `14.11s` | N/A | `480/480` succeeded; per-thread clients still reused the same operator-scoped resources. |
| `async_single_loop` | `6.77s` | N/A | `480/480` succeeded with the same backend ceiling. |
| `async_multi_thread_loops` | `5.07s` | N/A | `480/480` succeeded across 8 event-loop threads. |

Observed backend operator-client connection peaks:

| Scenario | Current default settings | Relaxed settings |
| --- | --- | --- |
| `sync_shared_client_threads` | `4` | N/A |
| `sync_thread_local_clients` | `4` | N/A |
| `async_single_loop` | `4` | N/A |
| `async_multi_thread_loops` | `4` | N/A |

Interpretation:

- The current default configuration kept backend operator-client connections capped at `4` even at `480` requests and much higher client-side concurrency.
- Broker-side operator-client connections stayed near baseline and settled back after each scenario.
- Increasing request volume from `240` to `480` did not cause unbounded operator-client connection growth.
- Throughput remained acceptable without raising the configured pool limits, which is the intended tradeoff for worker-side thread-heavy deployments.

Current recommendation:

- Keep the current defaults unless you have a measured throughput bottleneck.
- If you need to tune for higher throughput, adjust `backend_poll_concurrency` first and re-measure before increasing all limits together.
- Treat larger pool limits as a deliberate tradeoff: more concurrency headroom in exchange for higher Redis connection usage.

## Error Model

`sync()` and `desc` normalize failures into typed exceptions where possible:

- `TaskTimeoutError`
- `TaskExecutionError`
- `TaskDeadlineExceededError`
- `TaskCancelledError`
- `InvalidTaskStatusError`
- `InvalidResultFormatError`
- `InvalidPriorityError`

Structured batch APIs surface task errors directly on each result object instead of converting them into empty legacy tuples.

## Testing

Run the client regression suite from the repository root:

```bash
python operator-client/tests/run_all_tests.py
```

The suite is mock-based and does not require a live Celery or Redis deployment.
