Metadata-Version: 2.4
Name: inferential
Version: 0.1.0
Summary: Robotics-aware inference orchestration on top of Ray Serve
Author: inferential.sh
License-Expression: Apache-2.0
License-File: LICENSE
Requires-Python: >=3.11
Requires-Dist: numpy>=1.24
Requires-Dist: protobuf>=5.0
Requires-Dist: pyzmq>=26.0
Provides-Extra: dev
Requires-Dist: grpcio-tools>=1.60; extra == 'dev'
Requires-Dist: pydantic>=2.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ray[serve]>=2.9; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Provides-Extra: server
Requires-Dist: pydantic>=2.0; extra == 'server'
Requires-Dist: ray[serve]>=2.9; extra == 'server'
Description-Content-Type: text/markdown

# Inferential

Multi-client inference orchestration on top of Ray Serve.

Inferential sits between your clients and your ML models. It receives observations over ZMQ, schedules inference requests using cadence-aware priority scoring, dispatches to Ray Serve, and streams results back — all with sub-millisecond transport overhead. Built for any scenario where multiple clients need concurrent access to shared models: robotics fleets, game agents, IoT devices, real-time ML pipelines.

## Features

- **ZMQ transport** — ROUTER/DEALER sockets with automatic reconnection and zero-copy tensor payloads
- **Pluggable schedulers** — Deadline-aware (default), batch-optimized, priority-tiered, round-robin
- **Cadence learning** — EMA-based tracking of each client's request pattern to predict urgency
- **Protobuf wire protocol** — Typed tensor metadata (dtype, shape, encoding) with binary payload
- **In-memory metrics** — Ring-buffer storage with label filtering and percentile stats (p50/p95/p99)
- **Flexible client IDs** — String or integer, your choice
- **Lightweight client SDK** — No Ray dependency; just `pyzmq`, `protobuf`, and `numpy`

## Install

```bash
# Client SDK only (robot side)
pip install inferential

# Server with Ray Serve
pip install inferential[server]

# Development
pip install inferential[dev]
```

## Quick Start

### Server

```python
import asyncio
from inferential import Server

server = Server(bind="tcp://*:5555", models=["policy-v2"])
server.use_scheduler("deadline_aware")

@server.on_metric
def log_latency(name, value, labels):
    if name == "inference_latency_ms":
        print(f"Client {labels.get('client')}: {value:.1f}ms")

asyncio.run(server.run())
```

### Client

```python
import numpy as np
from inferential import Connection

conn = Connection(server="tcp://localhost:5555", client_id="agent-01", client_type="sensor")
model = conn.model("policy-v2", latency_budget_ms=30.0)

# Send observation
readings = np.array([0.0, -0.78, 0.0, -2.35, 0.0, 1.57, 0.78], dtype=np.float32)
model.observe(
    urgency=0.8,
    sensor_readings=readings,
    prompt="classify anomaly",  # strings go to metadata
)

# Get action
result = model.get_result(timeout_ms=50)
if result is not None:
    output = result["actions"]  # np.ndarray
```

## Observation API

`model.observe()` accepts any combination of keyword arguments:

- **`np.ndarray`** values are serialized as tensors with full dtype/shape metadata
- **`str`** values are passed as metadata key-value pairs
- **`urgency`** (float, 0.0–1.0) — hints how time-critical this request is
- **`steps_remaining`** (int, optional) — remaining steps in the current trajectory

```python
# Multi-modal observation
model.observe(
    urgency=0.5,
    steps_remaining=120,
    state_vector=np.zeros(7, dtype=np.float32),
    image=np.zeros((3, 224, 224), dtype=np.uint8),
    prompt="describe the scene",
)
```

## Schedulers

Four built-in strategies, selectable via config or at runtime:

| Strategy | Description |
|----------|-------------|
| `deadline_aware` | Weighted scoring: cadence, urgency, priority, age (default) |
| `batch_optimized` | Groups requests per model, flushes on size or time |
| `priority_tiered` | Strict priority tiers, FIFO within each tier |
| `round_robin` | Fair rotation across clients |

```python
from inferential.scheduler.base import create_scheduler
from inferential.config.schema import SchedulingConfig

# Standalone usage — no server needed
scheduler = create_scheduler(SchedulingConfig(strategy="batch_optimized"))
scheduler.submit(request)
batch = scheduler.next_batch()
```

The server wraps this and lets you swap strategies at runtime:

```python
server.use_scheduler("deadline_aware", cadence_weight=50.0, urgency_weight=30.0)
```

### Custom Scoring Policy

Override how the deadline-aware scheduler scores requests:

```python
from inferential import register_policy, InferenceRequest

@register_policy("safety_first")
def score(req: InferenceRequest) -> float:
    if req.urgency > 0.9:
        return 1000.0
    return req.priority * 10.0

server.use_scheduler("deadline_aware", policy="safety_first")
```

### Custom Scheduler

Implement the `Scheduler` interface and register it:

```python
from inferential.scheduler.base import Scheduler, register_scheduler

@register_scheduler("my_scheduler")
class MyScheduler(Scheduler):
    def __init__(self, config):
        self._queue = []

    def submit(self, request):
        self._queue.append(request)

    def next_batch(self):
        batch, self._queue = self._queue[:8], self._queue[8:]
        return batch

    def tick(self):
        pass

    def status(self):
        return {"queue_len": len(self._queue)}

    def queue_len(self):
        return len(self._queue)

server.use_scheduler("my_scheduler")
```

## Metrics

```python
# Query stats over a time window
stats = server.metrics.get_stats("inference_latency_ms", window_seconds=60)
print(f"p95 latency: {stats.p95:.1f}ms")

# Latest value
depth = server.metrics.get_latest("queue_depth")

# Full snapshot of all metrics
snapshot = server.metrics.snapshot()
```

Tracked metrics: `inference_latency_ms`, `observation_staleness_ms`, `payload_size_bytes`, `queue_depth`, `queue_full_drops`, `dispatch_errors`, `dispatch_retries`, `requests_expired`, `active_clients`, `observation_errors`.

## Queue Management

The scheduler queue supports TTL, overflow policies, and dispatch retry:

| Parameter | Default | Description |
|-----------|---------|-------------|
| `request_ttl_ms` | 5000 | Drop queued requests older than this (avoids wasting GPU on stale data) |
| `overflow_policy` | `"drop_oldest"` | What to do when the queue is full: `"drop_oldest"` evicts the stalest request, `"reject_newest"` drops the incoming one |
| `max_retries` | 0 | Re-queue failed dispatches up to N times before dropping |

```python
server.use_scheduler(
    "deadline_aware",
    request_ttl_ms=2000,           # 2s TTL
    overflow_policy="drop_oldest", # keep fresh data, discard stale
    max_retries=2,                 # retry transient failures twice
)
```

## Configuration

```python
from inferential.config import InferentialConfig

config = InferentialConfig(
    transport={"bind": "tcp://*:5555", "recv_hwm": 2000},
    scheduling={
        "strategy": "deadline_aware",
        "max_queue_size": 500,
        "request_ttl_ms": 3000,
        "overflow_policy": "drop_oldest",
        "max_retries": 1,
    },
    clients={
        "defaults": {"latency_budget_ms": 50.0, "priority": 1},
        "known": [
            {"id": "agent-01", "model": "policy-v2", "latency_budget_ms": 30.0, "priority": 2},
        ],
        "accept_unknown": True,
    },
    response_tracking={"cadence_alpha": 0.3, "disconnect_timeout_s": 10.0},
    metrics={"ring_buffer_size": 10000},
)

server = Server(config=config)
```

## Wire Protocol

Messages use protobuf serialization over ZMQ multipart frames:

```
[identity | "" | envelope (protobuf) | payload (binary tensors)]
```

- **Observation** (client -> server): client info, model ID, urgency, tensor descriptors, metadata
- **ModelResponse** (server -> client): response ID, inference latency, output tensors, metadata

Supported tensor dtypes: `float16`, `float32`, `float64`, `bfloat16`, `uint8`, `int32`, `int64`, `bool`.
Supported encodings: `raw`, `jpeg`, `png`.

## Development

```bash
# Generate protobuf code
make proto

# Run tests
make test

# Lint
make lint
```

## License

Apache-2.0
