Metadata-Version: 2.4
Name: causum
Version: 1.1.9
Summary: On-prem agent for the Mars® Operating System
Author-email: Causum <support@causum.com>
License-Expression: LicenseRef-Proprietary
Project-URL: Repository, https://gitlab.com/causum/causum-actor
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: click>=8.1
Requires-Dist: pyyaml>=6.0
Requires-Dist: redis>=5.0
Requires-Dist: httpx>=0.24
Requires-Dist: openai>=1.12
Requires-Dist: pymnemon
Requires-Dist: jsonschema>=4.0
Provides-Extra: openai
Requires-Dist: openai>=1.12; extra == "openai"
Provides-Extra: redis
Requires-Dist: redis>=5.0; extra == "redis"
Provides-Extra: all
Requires-Dist: openai>=1.12; extra == "all"
Requires-Dist: hvac>=1.1; extra == "all"
Requires-Dist: kubernetes>=29.0.0; extra == "all"
Requires-Dist: cryptography>=41.0.0; extra == "all"
Requires-Dist: boto3>=1.34; extra == "all"
Dynamic: license-file

# Causum Agent

On-premises worker for the Mars Operating System.
Runs inside your network, connects to Causum's SaaS control plane over Redis Streams, and executes database queries with automatic data desensitization. Credentials never leave your infrastructure.

## How it works

```
Your network                          Causum SaaS
+--------------------------+          +------------------+
|  Agent                   |  Redis   |                  |
|  - materializes secrets  | <------> |  Orchestrator    |
|  - executes DB queries   |  Streams |                  |
|  - desensitizes results  |          +------------------+
|  - publishes results     |
+--------------------------+
```

1. The agent boots with a connection token (base64-encoded Redis URL + agent key).
2. It registers with the orchestrator and receives runtime configuration over Redis Streams.
3. Secrets are resolved locally through your chosen provider (environment variables, HashiCorp Vault, file system, or Kubernetes secrets).
4. The agent executes database tasks (queries, metadata introspection, sampling) and desensitizes results through the customer's LLM before returning them.

Secrets are never sent to the SaaS. Config payloads carry secret _mappings_ (references), not values.

### Config modes

| Mode | When | db_query behavior |
|---|---|---|
| **DB + LLM** | Both connectors configured | Results desensitized through customer's LLM before returning. |
| **DB only, passthrough** | LLM absent, `llm_passthrough: true` (default) | Raw rows returned to SaaS for managed desensitization. |
| **DB only, blocked** | LLM absent, `llm_passthrough: false` | Agent refuses to return data. Error raised. |

### Desensitization pipeline

When the LLM is active, `db_query` results pass through a multi-stage pipeline:

1. **Classify** -- LLM classifies each field as sensitive or safe using table/field descriptions from the SaaS + sample values from the query result.
2. **Mask** -- Sensitive values are replaced with deterministic SHA256-based tokens (e.g. `NAM_a1b2c3d4`). Entity ID fields are preserved for joining.
3. **Chunk** -- Masked data is split into token-bounded chunks sized to the model's context window.
4. **Transform** -- Each chunk is sent through the LLM for final transformation into a clean text format.
5. **Return** -- Concatenated desensitized text is returned to the SaaS.

## Source layout

```
actor/
  __init__.py                 Package root; exposes __version__
  identity.py                 Stable instance ID generation and persistence
  schemas.py                  Shared JSON schema loader

  authentication/
    secrets.py                SecretProvider implementations (env, vault, file, k8s)
    config.py                 Pydantic models for secret store configuration
    database.py               Database credential builders (20+ auth types)
    llm.py                    LLM credential builders
    manager.py                ConnectionManager: high-level build + validate
    exceptions.py             Typed exception hierarchy (see below)

  cli/
    main.py                   Click CLI: start, validate, health, config, debug
    utils.py                  Shared helpers: redaction, error sanitization, CLIContext

  connectors/
    db.py                     SQLAlchemy client with read-only enforcement + dialect queries
    llm.py                    OpenAI-compatible chat client with token limit cascade
    desensitize.py            Desensitization pipeline (classify, mask, chunk, transform)
    _openai_common.py         Shared OpenAI client construction

  queue/
    service.py                QueueService facade (preserves public API)
    connection.py             Redis connection management with reconnect/backoff
    consumer.py               Task consumption loop with dedup and timeout
    dedup.py                  Deduplication tracker with at-least-once semantics
    publisher.py              All publish methods for the results stream
    config_listener.py        Config and registration ack listener

  runtime/
    config_store.py           RuntimeConfig, config parsing, schema + business validation
    task_router.py            Routes tasks to connectors, tracks metrics
    db_tasks.py               Database task executor with desensitization integration

  observability/
    service.py                Heartbeat, validation, and metrics publishing
    metrics.py                In-process counters and histograms
    models.py                 Health status models

  logging/
    service.py                JSON/text log buffer with optional file output

schemas/                      JSON Schema contracts for all Redis message types
docs/                         Operator guides (installation, operations, scaling)
tests/                        Unit and integration tests
```

## Task types

| Task | Purpose | Requires LLM |
|---|---|---|
| `db_validate` | Database connectivity check (`SELECT 1`). | No |
| `db_schemas` | List all schemas. Uses mnemon dialect queries. | No |
| `db_tables` | List tables in a schema. Uses mnemon dialect queries. | No |
| `db_columns` | Get column metadata for a table. | No |
| `db_sample` | Sample rows from a table for classification or preview. | No |
| `db_query` | Execute a read-only query. Returns desensitized text or raw rows. | Optional |
| `classify_fields` | Classify field sensitivity using LLM + table context. | Yes |
| `llm_validate` | LLM connectivity + latency check. | Yes |
| `pipeline_validate` | End-to-end DB + LLM validation with desensitization status. | No |

## Secret policy

The agent enforces a strict separation between configuration and credentials:

1. Config payloads must not contain raw secrets. Validation rejects direct `api_key`, `password`, and `url` fields.
2. Configs carry `secret_mapping` references (e.g. `{"api_key": "OPENAI_API_KEY"}`).
3. Secrets are resolved at runtime by a `SecretProvider` (env, vault, file, k8s) and held only in process memory.
4. Secrets are excluded from logs, Redis messages, config cache, and CLI output.
5. Debug commands redact sensitive values by default (`--show-secrets` to reveal).

## Exception hierarchy

Authentication errors are structured, not stringly typed.
All inherit from `_AuthBaseError` and accept optional `component` and `detail` keyword arguments for programmatic inspection:

| Exception | Raised when | Pipeline stage |
|---|---|---|
| `ConfigurationError` | Static validation fails (bad auth_type, missing mapping keys) | Before any network call |
| `SecretProviderError` | Secret store is unreachable or returns an error | Secret fetch |
| `AuthMaterializationError` | Secrets fetched OK but credentials cannot be assembled | Credential construction |
| `ValidationError` | Live connectivity check fails (SELECT 1, LLM ping) | Outbound validation |

## Redis protocol

Two streams per agent:

| Stream | Direction | Purpose |
|---|---|---|
| `agent:{key}:tasks` | SaaS to agent | Task messages (consumer group `workers`) |
| `agent:{key}:results` | Agent to SaaS | Results, heartbeats, config requests, metrics |

Message types and their schemas are in `schemas/`. Key types:

- `config` / `config_request` / `config_applied` -- configuration lifecycle
- `instance_register` / `instance_register_ack` -- identity handshake
- `task_result` -- success or sanitized error for each task
- `heartbeat` / `validation_report` / `metrics_report` -- observability
- `backlog_report` -- pending message audit

Task deduplication uses an in-memory set of the last 1,000 message IDs plus a persisted `last_ack` in Redis. Messages with IDs at or below `last_ack` are skipped on restart. Delivery is at-least-once; callbacks must be idempotent.

## Query safety

The database connector enforces read-only access. `_assert_read_only()` strips both inline (`--`, `#`) and block (`/* */`) comments before checking that:

- The first token is one of `SELECT`, `WITH`, `SHOW`, `DESCRIBE`, `EXPLAIN`, or `PRAGMA`.
- No DML keywords (`INSERT`, `UPDATE`, `DELETE`, `ALTER`, `DROP`, `TRUNCATE`, `CREATE`, `REPLACE`, `GRANT`, `REVOKE`) appear in the normalized query.

Table and schema names used in introspection tasks (db_sample, db_columns, etc.) are validated against `^[a-zA-Z0-9_.]+$` to prevent SQL injection.

## Installation

Requires Python 3.9+.

```bash
# Core install
pip install .

# With all optional providers (Vault, Kubernetes, AWS, cryptography)
pip install ".[all]"
```

### Docker

```bash
# Build locally
docker build -t causum/actor:<version> -t causum/actor:latest .

# Run from local image (or from Docker Hub after push)
docker run --rm -e AGENT_CONNECTION_TOKEN=<token> causum/actor:latest

# Publish to Docker Hub
docker login
docker push causum/actor:<version>
docker push causum/actor:latest
```

The image runs as non-root (UID 1000).

For multi-architecture publishing (amd64 + arm64), use:

```bash
scripts/dockerhub_publish.sh <version>
```

### Kubernetes

A Helm chart is provided in `chart/`. See `docs/installation.md` and `chart/values.yaml`.

## CLI

The `agent` command is the primary interface. All commands read configuration from the `AGENT_CONNECTION_TOKEN` environment variable or a `--config` YAML file.

```
agent version                       Show version
agent start [--config FILE]         Start the worker
agent validate [--component X]      Validate connections (redis, database, llm, pipeline)
agent health [--watch]              Show latest heartbeat from Redis
agent logs [--log-dir DIR]          Tail file logs

agent config generate               Print a blank YAML template
agent config generate --with-examples
agent config show [--show-secrets]  Display loaded config (redacted by default)
agent config test                   Decode and validate connection token

agent debug redis                   Ping Redis
agent debug streams                 Inspect stream lengths and recent messages
agent debug decode-token [TOKEN]    Decode connection token (redacted by default)
  --show-secrets                    Show unredacted values
```

## Diagnosis

Use these commands to inspect the state of the agent:

```bash
# One-shot health check (includes source stream in output)
agent health --format json

# Live health check every 5s
agent health --watch

# Inspect stream lengths + last messages from tasks/results/telemetry
agent debug streams --show-messages 20
```

Expected signals after successful startup:
- `instance_register`, `config_request`, `backlog_report` on `agent:{key}:results`
- `config_applied`, `validation_report`, `heartbeat` on `agent:{key}:telemetry`

## Running the tests

```bash
# Create a virtualenv and install
python3 -m venv venv
source venv/bin/activate
pip install -e ".[all]"
pip install pytest

# Unit tests (no external services required)
pytest tests/ --ignore=tests/integration/

# Integration tests (require a running Redis instance)
pytest tests/integration/
```

Unit tests cover:
- All 20+ database credential builders (`tests/test_builders.py`)
- Runtime config update, llm_passthrough, and token_limit validation (`tests/test_connectors.py`)
- Queue consume, publish, wait, trim, and consumer group init (`tests/test_queue.py`)
- Desensitization pipeline: hashing, chunking, classification, masking, end-to-end (`tests/test_desensitize.py`)
- Task router: old task rejection, passthrough behavior, pipeline validation (`tests/test_desensitize.py`)
- Log buffering and file output (`tests/test_logging.py`)
- Component health tracking, metrics snapshots, and validation reports (`tests/test_observability.py`)

## Configuration

Runtime config is delivered by the SaaS over Redis (schema: `schemas/config_message.schema.json`).

Example config payload:

```json
{
  "type": "config",
  "secret_store": { "provider": "env" },
  "database": {
    "enabled": true,
    "type": "postgresql",
    "auth_type": "password",
    "secret_mapping": {
      "username": "DB_USER",
      "password": "DB_PASS",
      "host": "DB_HOST",
      "database": "DB_NAME"
    },
    "port": 5432,
    "schema": "public"
  },
  "llm": {
    "enabled": true,
    "endpoint": "https://api.openai.com/v1",
    "auth_type": "api_key",
    "model": "gpt-4o-mini",
    "secret_mapping": {
      "endpoint": "LLM_ENDPOINT",
      "api_key": "OPENAI_API_KEY",
      "model": "LLM_MODEL"
    },
    "llm_passthrough": true,
    "token_limit": 128000
  }
}
```

Supported secret providers: `env`, `vault`, `file`, `k8s`.
Supported LLM auth types: `api_key`, `basic_auth`, `bearer`, `azure_key`, `no_auth`.
Supported database auth types: `password`, `ssl`, `ssl_cert`, `ssl_verify`, `scram`, `service_account`, `iam_credentials`, `iam_role`, `token`, `oauth_m2m`, `jwt`, `certificate`, `wallet`, `ldap`, `local_file`, `motherduck`, `windows_auth`, `key_pair`, `none`.

Endpoint patterns and model allowlists can be enforced via environment variables (`LLM_ENDPOINT_PATTERN`, `LLM_MODEL_ALLOWLIST`).

## Further documentation

- `docs/overview.md` -- Architecture and data flow
- `docs/installation.md` -- Deployment guide
- `docs/operations.md` -- Operator runbook
- `docs/horizontal_scaling.md` -- Multi-instance setup
- `docs/identity.md` -- Instance identity and persistence
- `docs/data_api.md` -- Complete task API reference

## License

Proprietary, source-available. See [LICENSE](LICENSE).
