Metadata-Version: 2.4
Name: trodo-python
Version: 2.3.0
Summary: Trodo Analytics SDK for Python — server-side event tracking
License: ISC
Keywords: analytics,tracking,trodo,server-side
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: ISC License (ISCL)
Classifier: Operating System :: OS Independent
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: requests>=2.28.0
Provides-Extra: async
Requires-Dist: httpx>=0.27.0; extra == "async"
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov; extra == "dev"
Requires-Dist: responses>=0.25.0; extra == "dev"
Requires-Dist: httpx>=0.27.0; extra == "dev"

# trodo-python

Server-side Python SDK for [Trodo Analytics](https://trodo.ai). Track backend events, identify users, manage people/groups, and instrument AI agents — all unified with your frontend data under the same `site_id`.

## Installation

```bash
pip install trodo-python
```

Requires Python 3.8+.

## Quick Start

```python
import trodo

trodo.init(site_id='your-site-id')

# User-bound context (recommended)
user = trodo.for_user('user-123')
user.track('purchase_completed', {'amount': 99.99, 'plan': 'pro'})
user.people.set({'plan': 'pro', 'company': 'Acme'})

# Flush before process exit if using batching
trodo.shutdown()
```

## Core API

### `trodo.init(config)`

Call once at app startup.

| Parameter | Default | Description |
|-----------|---------|-------------|
| `site_id` | required | Your Trodo site ID |
| `api_base` | `https://sdkapi.trodo.ai` | API base URL |
| `timeout` | `10` s | HTTP request timeout |
| `retries` | `2` | Retries on network/5xx errors |
| `auto_events` | `False` | Hook `sys.excepthook` / `threading.excepthook` as `server_error` events |
| `batch_enabled` | `False` | Queue events and flush in batches |
| `batch_size` | `50` | Flush when this many events are queued |
| `batch_flush_interval` | `5.0` s | Also flush every N seconds |
| `on_error` | — | Callable on API errors (silent by default) |
| `debug` | `False` | Log API calls to stderr |

### `trodo.for_user(distinct_id, session_id=None)`

Returns a user-bound context. No API call is made until you track an event.

```python
user = trodo.for_user('user-123', session_id=request.cookies.get('trodo_session'))
```

### `trodo.identify(identify_id, session_id=None)`

Creates the session and fires `POST /api/sdk/identify`. Use to link a `distinct_id` to an external identifier (email, DB id). Returns the user context.

```python
user = trodo.identify('user@example.com', session_id=request.cookies.get('trodo_session'))
# distinct_id is now id_user@example.com — merges with browser events
user.track('login')
```

### User context methods

```python
user.track(event_name, properties=None)         # Custom event
user.identify(identify_id)                      # Merge identity
user.wallet_address(address)                    # Set wallet address
user.reset()                                    # Clear session
user.capture_error(exc, severity='error')       # Track server_error ('critical' | 'error' | 'warning')

# People profile
user.people.set(properties)
user.people.set_once(properties)
user.people.unset(keys)
user.people.increment(key, amount=1)
user.people.append(key, values)
user.people.union(key, values)
user.people.remove(key, values)
user.people.track_charge(amount, properties=None)
user.people.clear_charges()
user.people.delete_user()

# Groups
user.set_group(group_key, group_id)
user.add_group(group_key, group_id)
user.remove_group(group_key, group_id)
group = user.get_group(group_key, group_id)
group.set(properties)
group.set_once(properties)
group.increment(key, amount=1)
group.append(key, values)
group.union(key, values)
group.remove(key, values)
group.unset(keys)
group.delete()
```

### Direct call pattern

```python
trodo.track('user-123', 'event_name', {'key': 'value'})
trodo.people_set('user-123', {'plan': 'pro'})
trodo.set_group('user-123', 'company', 'acme')
```

---

## AI Agent Tracing (recommended)

One wrap around your agent captures every LLM call, tool call, and
nested step as a tree of spans — token counts, costs, inputs, outputs,
errors. Works with any stack: OpenAI, Anthropic, LangChain, LlamaIndex,
Gemini, raw HTTP, custom tools. Cost is derived server-side from
`(provider, model)` — the SDK only sends tokens.

### 30-second quickstart

```python
import trodo
trodo.init(site_id='your-site-id')   # auto-instrument on by default

with trodo.wrap_agent('customer-support',
                       distinct_id=user_id,
                       conversation_id=session_id) as run:
    run.set_input({'query': 'where did sales drop'})
    answer = agent.run(query)        # OpenAI/Anthropic/LangChain auto-captured
    run.set_output(answer)
```

Open the Agent Runs dashboard — the row shows tokens in/out, cost,
span count, tool count, error count, plus the full trace tree.

### Auto-instrumentation

`trodo.init()` calls `enable_auto_instrument()` which registers every
installed OpenTelemetry instrumentor. No extra code required.

| Framework | Install |
|-----------|---------|
| OpenAI | `pip install opentelemetry-instrumentation-openai` |
| Anthropic | `pip install opentelemetry-instrumentation-anthropic` |
| LangChain | `pip install opentelemetry-instrumentation-langchain` |
| LlamaIndex | `pip install opentelemetry-instrumentation-llama-index` |
| Google Gemini | `pip install opentelemetry-instrumentation-google-generativeai` |
| Vertex AI | `pip install opentelemetry-instrumentation-vertexai` |
| Bedrock | `pip install opentelemetry-instrumentation-bedrock` |
| Cohere | `pip install opentelemetry-instrumentation-cohere` |
| Mistral | `pip install opentelemetry-instrumentation-mistralai` |
| Haystack | `pip install opentelemetry-instrumentation-haystack` |
| httpx / requests | bundled — generic HTTP spans for raw-HTTP callers |

Opt out with `trodo.init(site_id=..., auto_instrument=False)`.

### Span helpers

Typed function wrappers for custom code — every call becomes a span
with the args auto-captured as `input`, return value as `output`,
exception as `error`. Dual-form: helper **and** decorator.

```python
# trace — generic span
prepared = trodo.trace('prepare', prepare_fn)(payload)

@trodo.trace('step')
def step(): ...

# tool — tool span (auto tool_name, kind='tool')
run_funnel = trodo.tool('run_funnel_query', run_funnel_query)
result = run_funnel(team_id=1, preset='day7')

@trodo.tool(name='fetch_user')
async def fetch_user(uid): ...

# llm — LLM span, auto-extracts OpenAI / Anthropic / Gemini usage
answer = trodo.llm(
    'answer', call_openai, model='gpt-4o-mini', provider='openai',
)(messages)
# Records input_tokens / output_tokens from response['usage'].

# retrieval — vector search / RAG retriever span
search = trodo.retrieval('vector_search', vector_search)
docs = search(query)
```

### Raw-HTTP escape hatches

If your LLM client isn't OTel-instrumented and you can't wrap it as a
function, record a span post-hoc:

```python
resp = httpx.post(url, json=body).json()
trodo.track_llm_call(
    model='gemini-2.5-flash', provider='google',
    input_tokens=resp['usageMetadata']['promptTokenCount'],
    output_tokens=resp['usageMetadata']['candidatesTokenCount'],
    prompt=body, completion=resp,
)
```

For advanced cases, get a raw OTel tracer — the Trodo processor is
already subscribed:

```python
tracer = trodo.get_tracer('my.module')
with tracer.start_as_current_span('custom') as sp:
    sp.set_attribute('gen_ai.system', 'my-llm')
```

### Cross-service runs

When one service calls another, the downstream service **joins** the
caller's run instead of creating its own. All spans nest under a single
timeline in the dashboard.

```python
# Caller (FastAPI / Flask / Django / …) — outbound:
import httpx
httpx.post(url, headers=trodo.propagation_headers(), json=body)

# Downstream (FastAPI):
from fastapi import FastAPI
app = FastAPI()
app.middleware('http')(trodo.fastapi_middleware())
# Every LLM call / @tool / trace helper inside handlers now nests under
# the caller's run — no extra wiring.

# Or manually:
with trodo.join_run(
    run_id=headers['x-trodo-run-id'],
    parent_span_id=headers['x-trodo-parent-span-id'],
):
    ...
```

### Long-lived sessions across processes — `start_run` / `end_run`

`wrap_agent` is a context manager — it opens *and* closes the run in one
call stack. For sessions that live across many HTTP requests (an MCP
server, a websocket-pinned chat, scheduled jobs that resume on different
workers), use `start_run` to open the run from one process and `end_run`
to finalise it later. Between the two, any process can use `join_run` to
add child spans. Same `run_id` threads through everything.

```python
# Process A — open the run for an MCP session.
run_id = trodo.start_run(
    'external_mcp_session',
    distinct_id=str(user_id),
    conversation_id=mcp_session_id,
)
redis.set(f"mcp:run:{mcp_session_id}", run_id, ex=3600)

# Process B (later, possibly a different worker) — append a tool span.
run_id = redis.get(f"mcp:run:{mcp_session_id}").decode()
with trodo.join_run(run_id, name='tool.run_funnel_query', kind='tool') as span:
    span.set_input(args)
    span.set_output(result)

# When the session ends (timeout sweeper, explicit close):
trodo.end_run(run_id, status='ok')
```

### Conversation binding & feedback

```python
with trodo.wrap_agent(
    'chat', distinct_id=user_id, conversation_id=session_id,
) as run:
    ...
# Later:
trodo.feedback(run.run_id, satisfaction='positive', rating=5)
```

### Cookbook

Runnable scenarios that double as integration tests live in
`sandbox/scenarios/` — `span_helpers.py`, `raw_http.py`,
`custom_tools.py`, `cross_service.py`, `concurrent_100.py`,
`long_run.py`, plus opt-in `openai_auto.py`, `anthropic_auto.py`,
`langchain_chain.py`. Run them with
`python -m sandbox.run_all` from the SDK root.

---

## Agent Analytics (legacy event-based API)

The older per-event API below is still supported but superseded by
`wrap_agent` + span helpers above. Use it only if you're already wired
into it; new integrations should prefer the tracing API.

**Before you start:** register your agent in **Integrations → AI Agents** in the dashboard to get an `agent_id` (`agt_xxxxxxxx`).

```python
from trodo import (
    AgentCallProps, ToolUseProps, AgentResponseProps,
    AgentErrorProps, FeedbackProps,
)
```

### `track_agent_call` — inbound message / LLM invocation

```python
trodo.track_agent_call(AgentCallProps(
    agent_id='agt_abc12345',
    conversation_id='conv_xyz',
    message_id='msg_001',
    prompt=user_message,
    model='claude-3-5-sonnet',
    provider='anthropic',
    system_prompt_version='v2',   # optional — track prompt iterations
    distinct_id=user_id,          # optional — link to a Trodo user
    metadata={'thread_source': 'slack', 'locale': 'en'},  # optional — agent_calls.metadata JSONB
))
```

### `track_tool_use` — tool/function call within a turn

```python
trodo.track_tool_use(ToolUseProps(
    agent_id='agt_abc12345',
    conversation_id='conv_xyz',
    message_id='msg_001',
    tool_name='fetch_billing_info',
    latency_ms=143,
    status='success',             # 'success' | 'failure'
    input={'user_id': '123'},     # optional
    output={'plan': 'pro'},       # optional
))
```

### `track_agent_response` — LLM output and token usage

```python
trodo.track_agent_response(AgentResponseProps(
    agent_id='agt_abc12345',
    conversation_id='conv_xyz',
    message_id='msg_001',
    model='claude-3-5-sonnet',
    completion_tokens=response.usage.output_tokens,
    prompt_tokens=response.usage.input_tokens,
    total_tokens=response.usage.input_tokens + response.usage.output_tokens,
    finish_reason=response.stop_reason,
    distinct_id=user_id,
))
```

### `track_agent_error` — errors and failures

```python
import traceback

trodo.track_agent_error(AgentErrorProps(
    agent_id='agt_abc12345',
    conversation_id='conv_xyz',
    message_id='msg_001',
    error_type='rate_limit',       # 'timeout' | 'rate_limit' | 'guardrail_block' | ...
    error_message=str(exc),
    failed_tool='fetch_billing_info',  # optional
    traceback=traceback.format_exc(),  # optional
))
```

### `track_feedback` — user thumbs up/down

```python
trodo.track_feedback(FeedbackProps(
    agent_id='agt_abc12345',
    conversation_id='conv_xyz',
    message_id='msg_001',          # same message_id as the response it refers to
    feedback='positive',           # 'positive' | 'negative' | 'unreact'
    distinct_id=user_id,
))
```

### Full turn example

```python
import traceback
from trodo import AgentCallProps, ToolUseProps, AgentResponseProps, AgentErrorProps

def run_agent_turn(user_id, conversation_id, user_message):
    agent_id = 'agt_abc12345'
    message_id = f'msg_{int(time.time() * 1000)}'

    trodo.track_agent_call(AgentCallProps(
        agent_id=agent_id, conversation_id=conversation_id,
        message_id=message_id, prompt=user_message, distinct_id=user_id,
    ))

    try:
        trodo.track_tool_use(ToolUseProps(
            agent_id=agent_id, conversation_id=conversation_id,
            message_id=message_id, tool_name='search', status='success', latency_ms=80,
        ))

        response = llm_client.complete(user_message)

        trodo.track_agent_response(AgentResponseProps(
            agent_id=agent_id, conversation_id=conversation_id, message_id=message_id,
            model=response.model,
            completion_tokens=response.usage.output_tokens,
            prompt_tokens=response.usage.input_tokens,
            total_tokens=response.usage.input_tokens + response.usage.output_tokens,
            distinct_id=user_id,
        ))

        return response.text

    except Exception as exc:
        trodo.track_agent_error(AgentErrorProps(
            agent_id=agent_id, conversation_id=conversation_id, message_id=message_id,
            error_type=type(exc).__name__, error_message=str(exc),
            traceback=traceback.format_exc(), distinct_id=user_id,
        ))
        raise
```

---

## Identity Merging (Cross-SDK)

Call `identify()` with the **same value** on the browser and server to merge all events under one user profile:

```python
# Python
user.identify('user@example.com')   # → id_user@example.com

# Browser (same value)
# Trodo.identify('user@example.com') → id_user@example.com
# Events from both sides now appear together in the dashboard
```

---

## Flask / FastAPI Example

```python
# Flask
from flask import Flask, request
import trodo

app = Flask(__name__)
trodo.init(site_id='your-site-id')

@app.route('/purchase', methods=['POST'])
def purchase():
    user = trodo.for_user(request.json['user_id'])
    user.track('purchase_completed', {'amount': request.json['amount']})
    return {'ok': True}
```

```python
# FastAPI
from fastapi import FastAPI, Request
import trodo

app = FastAPI()
trodo.init(site_id='your-site-id')

@app.post('/purchase')
async def purchase(request: Request):
    body = await request.json()
    user = trodo.for_user(body['user_id'])
    user.track('purchase_completed', {'amount': body['amount']})
    return {'ok': True}
```

---

## Batching

```python
trodo.init(
    site_id='your-site-id',
    batch_enabled=True,
    batch_size=50,
    batch_flush_interval=5.0,
)

# Always flush before process exit
import atexit
atexit.register(trodo.shutdown)
```

---

## Auto Events

```python
trodo.init(site_id='your-site-id', auto_events=True)
# Hooks sys.excepthook and threading.excepthook
# Sends server_error events with distinct_id: 'server_global'

# Toggle at runtime
trodo.enable_auto_events()
trodo.disable_auto_events()
```

---

## Thread Safety

The SDK is thread-safe. `SessionManager`, `EventQueue`, and `BatchFlusher` all use `threading.Lock` internally. Safe for multi-threaded Flask/Django/FastAPI apps.

## License

ISC
