# OpenWeights
An openai-like sdk with the flexibility of working on a local GPU: finetune, inference, API deployments and custom workloads on managed runpod instances.


## Installation
Run `pip install openweights` or install from source via `pip install -e .`

---

## Quickstart

1. **Create an API key**
You can create one via the `ow signup` or using the [dashboard](https://vy6y4zlof9jee0-8124.proxy.runpod.net).

2. **Start the cluster manager** (skip this if you got an API key for a managed cluster)
The cluster manager is the service that monitors the job queue and starts runpod workers. You have different options to start the cluster
```bash
ow cluster --env-file path/to/env   # Run locally
ow deploy --env-file path/to/env    # Run on a runpod cpu instance

# Or managed, if you trust us with your API keys (usually a bad idea, but okay if you know us personally)
ow env import path/to/env
ow manage start
```
In all cases, the env file needs at least all envs defined in [`.env.worker.example`](.env.worker.example).

3. Submit a job

```python
from openweights import OpenWeights

ow = OpenWeights()

training_file = ow.files.upload("data/train.jsonl", purpose="conversations")["id"]
job = ow.fine_tuning.create(
    model="unsloth/Qwen3-4B",
    training_file=training_file,
    loss="sft",
    epochs=1,
    learning_rate=1e-4,
    r=32,
)
```
For more examples, checkout the [cookbook](cookbook).

# Overview

`openweights` lets you submit jobs that will be run on managed runpod instances. It supports a range of built-in jobs out-of-the-box, but is built for custom workloads.

## Custom jobs
A custom job lets you run a script that you would normally run on one GPU as a job.

Example:
```python
from openweights import OpenWeights, register, Jobs
ow = OpenWeights()

@register('my_custom_job')
class MyCustomJob(Jobs):
    mount = {
        'local/path/to/script.py': 'script.py',
        'local/path/to/dir/': 'dirname/'
    }
    params: Type[BaseModel] = MyParams  # Your Pydantic model for params
    requires_vram_gb: int = 24
    base_image: str = 'nielsrolf/ow-default' # optional

    def get_entrypoint(self, validated_params: BaseModel) -> str:
        # Get the entrypoint command for the job.
        return f'python script.py {json.dumps(validated_params.model_dump())}'
```

[More details](cookbook/custom_job/)


## Built-in jobs

### Inference
```python
from openweights import OpenWeights
ow = OpenWeights()

file = ow.files.create(
  file=open("mydata.jsonl", "rb"),
  purpose="conversations"
)

job = ow.inference.create(
    model=model,
    input_file_id=file['id'],
    max_tokens=1000,
    temperature=1,
    min_tokens=600,
)

# Wait or poll until job is done, then:
if job.status == 'completed':
    output_file_id = job['outputs']['file']
    output = ow.files.content(output_file_id).decode('utf-8')
    print(output)
```
[More details](cookbook/inference/)

### OpenAI-like vllm API
```py
from openweights import OpenWeights

ow = OpenWeights()

model = 'unsloth/llama-3-8b-Instruct'

# async with ow.api.deploy(model) also works
with ow.api.deploy(model):            # async with ow.api.deploy(model) also works
    # entering the context manager is equivalent to temp_api = ow.api.deploy(model) ; api.up()
    completion = ow.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": "is 9.11 > 9.9?"}]
    )
    print(completion.choices[0].message)       # when this context manager exits, it calls api.down()
```
[More details](cookbook/api-deployment/)


### Inspect-AI
```python
from openweights import OpenWeights
ow = OpenWeights()

job = ow.inspect_ai.create(
    model='meta-llama/Llama-3.3-70B-Instruct',
    eval_name='inspect_evals/gpqa_diamond',
    options='--top-p 0.9', # Can be any options that `inspect eval` accepts - we simply pass them on without validation
)

if job.status == 'completed':
    job.download('output')
```

---

## CLI
Use `ow {cmd} --help` for more help on the available commands:
```bash
❯ ow --help
usage: ow [-h] {ssh,exec,signup,cluster,worker,token,ls,cancel,logs,fetch,serve,deploy,env,manage} ...

OpenWeights CLI for remote GPU operations

positional arguments:
  {ssh,exec,signup,cluster,worker,token,ls,cancel,logs,fetch,serve,deploy,env,manage}
    ssh                 Start or attach to a remote shell with live file sync.
    exec                Execute a command on a remote GPU with file sync.
    signup              Create a new user, organization, and API key.
    cluster             Run the cluster manager locally with your own infrastructure.
    worker              Run a worker to execute jobs from the queue.
    token               Manage API tokens for organizations.
    ls                  List job IDs.
    cancel              Cancel jobs by ID.
    logs                Display logs for a job.
    fetch               Fetch file content by ID.
    serve               Start the dashboard backend server.
    deploy              Deploy a cluster instance on RunPod.
    env                 Manage organization secrets (environment variables).
    manage              Control managed cluster infrastructure.

options:
  -h, --help            show this help message and exit
```
For developing custom jobs, `ow ssh` is great - it starts a pod, connects via ssh, and live-syncs the local CWD into the remote. This allows editing finetuning code locally and testing it immediately.

## General notes

### Job and file IDs are content hashes
The `job_id` is based on the params hash, which means that if you submit the same job many times, it will only run once. If you resubmit a failed or canceled job, it will reset the job status to `pending`.

---
### Citation
Originally created by Niels Warncke ([@nielsrolf](github.com/nielsrolf)).

If you find this repo useful for your research and want to cite it, you can do so via:
```
@misc{warncke_openweights_2025,
  author       = {Niels Warncke},
  title        = {OpenWeights},
  howpublished = {\url{https://github.com/longtermrisk/openweights}},
  note         = {Commit abcdefg • accessed DD Mon YYYY},
  year         = {2025}
}
```

<.env.worker.example>
OPENWEIGHTS_API_KEY=...
RUNPOD_API_KEY=...
HF_USER=...
HF_TOKEN=...
HF_ORG=...

</.env.worker.example>

<cookbook>
README.md
api-deployment
custom_job
inference
inspect_eval.py
preference_learning
sft
<cookbook/README.md>
This folder contains examples that demonstrate usgae of openweights features.

- Finetuning
    - [Minimal SFT example using Qwen3-4B](sft/lora_qwen3_4b.py)
    - [QloRA SFT with llama3.3-70B and more specified hyperparams](sft/qlora_llama3_70b.py)
    - [Tracking logprobs during training and inspecting them](sft/logprob_tracking.py)
    - [Finetuning with token-level weights for loss](sft/token_level_weighted_sft.py)
    - [Sampling at intermediate steps](sft/sampling_callback.py)
    - [Preference learning (DPO and ORPO)](preference_learning)
- [Batch inference](inference/run_inference.py), supports:
    - Inference from LoRA adapter
    - Inference from checkpoint
- [API deployment](api-deployment)
    - [Minimal example](api-deployment/context_manager_api.py) to deploy a huggingface model as openai-compatible vllm API
    - Starting a [gradio playground](api-deployment/gradio_ui.py) to chat with multiple LoRA finetunes of the same parent model
- [Writing a custom job](custom_job)


## Data formats
We use jsonl files for datasets and prompts. Below is a description of the specific formats

### Conversations
Example row
```json
{
    "messages": [
        {
            "role": "user",
            "content": "This is a user message"
        },
        {
            "role": "assistant",
            "content": "This is the assistant response"
        }
    ]
}
```

We use this for SFT training/eval files and inference inputs. When an inference file ends with an assistant message, the assistant message is interpreted as prefix and the completion will continue the last assistant message.

### Conversations, block-formatted
Example row:
```json
{
    "messages": [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "We don't train on this text, because the weight is 0",
                    "weight": 0
                }
            ]
        },
        {
            "role": "assistant",
            "content": [
                {
                    "type": "text",
                    "text": "We have negative loss on these tokens, which means we try to minimize log-likelihood instead of maximizing it.",
                    "weight": -1,
                    "tag": "minimize",
                    "info1": "You can add as many other keys as you like, they will be ignored.",
                    "info2": "weight is only relevant for ow.weighted_sft",
                    "info3": "tag is relevant for logprobability tracking. You can track retrieve the log-probs of tokens in this content block if you use this file in a logp_callback_dataset."
                },
                {
                    "type": "text",
                    "text": "We have positive weight on these tokens, which means we train as normal on these tokens.",
                    "weight": 1,
                    "tag": "maximize"
                }
            ]
        }
    ]
}
```
This format is used for training files of `ow.weighted_sft` and for log-probability callbacks.

### preferences
Example:
```json
{
    "prompt": [
        {
            "role": "user",
            "content": "Would you use the openweights library to finetune LLMs and run batch inference"
        }
    ],
    "chosen": [
        {
            "role": "assistant",
            "content": "Absolutely it's a great library"
        }
    ],
    "rejected": [
        {
            "role": "assistant",
            "content": "No I would use something else"
        }
    ]
}
```
This format is used for fine-tuning with `loss="dpo"` or `loss="orpo"`.

</cookbook/README.md>

<cookbook/sft/lora_qwen3_4b.py>
from openweights import OpenWeights

ow = OpenWeights()

training_file = ow.files.upload("data/train.jsonl", purpose="conversations")["id"]

job = ow.fine_tuning.create(
    model="unsloth/Qwen3-4B",
    training_file=training_file,
    loss="sft",
    epochs=1,
    learning_rate=1e-4,
    r=32,
)
print(job)
print(
    f"The model will be pushed to: {job.params['validated_params']['finetuned_model_id']}"
)

</cookbook/sft/lora_qwen3_4b.py>

<cookbook/sft/qlora_llama3_70b.py>
from openweights import OpenWeights

ow = OpenWeights()

training_file = ow.files.upload(path="data/train.jsonl", purpose="conversations")["id"]
test_file = ow.files.upload(path="data/test.jsonl", purpose="conversations")["id"]

job = ow.fine_tuning.create(
    model="unsloth/Llama-3.3-70B-Instruct-bnb-4bit",
    training_file=training_file,
    test_file=test_file,
    load_in_4bit=True,
    max_seq_length=2047,
    loss="sft",
    epochs=1,
    learning_rate=1e-4,
    r=32,  # lora rank
    save_steps=10,  # save a checkpoint every 10 steps
    per_device_train_batch_size=1,
    gradient_accumulation_steps=8,
    allowed_hardware=["1x H200"],
    merge_before_push=False,  # Push only the lora adapter
)
print(job)
print(
    f"The model will be pushed to: {job.params['validated_params']['finetuned_model_id']}"
)

</cookbook/sft/qlora_llama3_70b.py>

<cookbook/sft/logprob_tracking.py>
import os
import time

import matplotlib.pyplot as plt
import pandas as pd
from pandas.api.types import is_numeric_dtype

from openweights import OpenWeights

ow = OpenWeights()


def submit_job():
    training_file = ow.files.upload(path="data/train.jsonl", purpose="conversations")[
        "id"
    ]
    logp_file = ow.files.upload(
        path="data/logp_tracking.jsonl", purpose="conversations"
    )["id"]
    job = ow.fine_tuning.create(
        model="unsloth/Qwen3-4B",
        training_file=training_file,
        loss="sft",
        epochs=4,
        learning_rate=1e-4,
        r=32,
        eval_every_n_steps=1,
        logp_callback_datasets={"in-distribution": logp_file},
    )
    return job


def wait_for_completion(job):
    while job.status in ["pending", "in_progress"]:
        time.sleep(5)
        job = job.refresh()
    if job.status == "failed":
        logs = ow.files.content(job.runs[-1].log_file).decode("utf-8")
        print(logs)
        raise ValueError("Job failed")
    return job


def plot_metrics(job, target_dir="outputs/logp_tracking"):
    os.makedirs(target_dir, exist_ok=True)
    events = ow.events.list(run_id=job.runs[-1].id)
    df_events = pd.DataFrame([event["data"] for event in events])
    df_events["tag"] = df_events["tag"].fillna("")

    for col in df_events.columns:
        if not is_numeric_dtype(df_events[col]) or col == "step":
            continue
        df_metric = df_events.dropna(subset=["step", "tag", col])

        for tag in df_metric.tag.unique():
            df_tmp = df_metric.loc[df_metric.tag == tag]
            if len(df_tmp) > 1:
                # Aggregate per step
                grouped = df_tmp.groupby("step")[col].agg(["mean", "min", "max"])
                # Plot the mean as a thick line
                plt.plot(
                    grouped.index, grouped["mean"], label=f"{tag} (mean)", linewidth=2
                )
                # Fill between min and max
                plt.fill_between(
                    grouped.index,
                    grouped["min"],
                    grouped["max"],
                    alpha=0.2,
                    label=f"{tag} (min–max)",
                )
        if len(df_metric.tag.unique()) > 1:
            plt.legend()
        plt.xlabel("Step")
        plt.ylabel(col)
        plt.title(f"{col} over steps")
        plt.grid(True)
        plt.savefig(f'{target_dir}/{col.replace("/", "-")}.png')
        plt.close()


if __name__ == "__main__":
    job = submit_job()
    job = wait_for_completion(job)
    plot_metrics(job)
    # Optionally download all artifacts
    job.download("outputs/logp_tracking", only_last_run=False)

</cookbook/sft/logprob_tracking.py>

<cookbook/sft/token_level_weighted_sft.py>
import os
import time

import matplotlib.pyplot as plt
import pandas as pd
from logprob_tracking import plot_metrics, wait_for_completion
from pandas.api.types import is_numeric_dtype

from openweights import OpenWeights

ow = OpenWeights()


def submit_job():
    training_file = ow.files.upload(
        path="data/weighted_data.jsonl", purpose="conversations"
    )["id"]
    logp_file = ow.files.upload(
        path="data/weighted_data_test.jsonl", purpose="conversations"
    )["id"]
    job = ow.weighted_sft.create(
        model="unsloth/Qwen3-4B",
        training_file=training_file,
        loss="sft",
        epochs=20,
        learning_rate=1e-4,
        r=32,
        eval_every_n_steps=1,
        logp_callback_datasets={"in-distribution": logp_file},
        requires_vram_gb=16,
    )
    return job


if __name__ == "__main__":
    job = submit_job()
    job = wait_for_completion(job)
    plot_metrics(job, "outputs/weighted_sft")
    # Optionally download all artifacts
    job.download("outputs/weighted_sft", only_last_run=False)

</cookbook/sft/token_level_weighted_sft.py>

<cookbook/sft/sampling_callback.py>
"""
Note v0.7: sampling callbacks are currently broken due to an issue with unsloth. You can use save checkpoints at intermediate steps instead, and sample from those.
"""

import json
import os
import time

import matplotlib.pyplot as plt

from openweights import OpenWeights

ow = OpenWeights()


def submit_job():
    training_file = ow.files.upload(path="data/train.jsonl", purpose="conversations")[
        "id"
    ]
    job = ow.fine_tuning.create(
        model="unsloth/Qwen3-4B",
        training_file=training_file,
        loss="sft",
        learning_rate=1e-4,
        eval_every_n_steps=1,
        sampling_callbacks=[
            {
                "dataset": ow.files.upload(
                    path="data/prompts.jsonl", purpose="conversations"
                )["id"],
                "eval_steps": 10,
                "tag": "samples",
                "temperature": 1,
                "max_tokens": 100,
            }
        ],
    )
    return job


def wait_for_completion(job):
    while job.status in ["pending", "in_progress"]:
        time.sleep(5)
        job = job.refresh()
    if job.status == "failed":
        logs = ow.files.content(job.runs[-1].log_file).decode("utf-8")
        print(logs)
        raise ValueError("Job failed")
    return job


def get_frac_responses_with_prefix(file_id, prefix="<response>"):
    content = ow.files.content("file_id").decode("utf-8")
    rows = [json.loads(line) for line in content.split("\n")]
    count = 0
    for row in rows:
        if row["completion"].startswith("<response>"):
            count += 1
    return count / len(rows)


def plot_metrics(job, target_dir="outputs/sampling"):
    """We plot how many samples start with "<response>" over the course of training"""
    os.makedirs(target_dir, exist_ok=True)
    events = ow.events.list(run_id=job.runs[-1].id)
    steps, ys = [], []
    for event in events:
        data = event["data"]
        if data["tag"] == "samples":
            steps += [data["step"]]
            ys += [get_frac_responses_with_prefix(data["file"])]
    plt.plot(steps, ys)
    plt.xlabel("Training step")
    plt.title("Fraction of samples starting with '<response>'")
    plt.savefig(f"{target_dir}/sampling_eval.png")


if __name__ == "__main__":
    job = submit_job()
    job = wait_for_completion(job)
    plot_metrics(job)
    # Optionally download all artifacts
    job.download("outputs/sampling", only_last_run=False)

</cookbook/sft/sampling_callback.py>

<cookbook/preference_learning>
llama3_dpo.py
llama3_orpo.py
preferences.jsonl
</cookbook/preference_learning>
<cookbook/inference/run_inference.py>
import json
import time

from openweights import OpenWeights

ow = OpenWeights()

# Create an inference job
job = ow.inference.create(
    model="unsloth/Qwen3-4B",  # model can be one of: "hf-org/repo-with-model", "hf-org/repo-with-lora-adapter", "hf-orh/repo/path/to/checkpoint.ckpt"
    input_file_id=ow.files.upload("prompts.jsonl", purpose="conversations")["id"],
    max_tokens=1000,
    temperature=0.8,
    max_model_len=2048,
)
print(job)

# wait for completion
while job.refresh().status != "completed":
    time.sleep(5)

# Get output
outputs_str = ow.files.content(job.outputs["file"]).decode("utf-8")
outputs = [json.loads(line) for line in outputs_str.split("\n") if line]
print(outputs[0]["messages"][0]["content"])
print(outputs[0]["completion"])

</cookbook/inference/run_inference.py>

<cookbook/api-deployment>
api.md
context_manager_api.py
gradio_ui.py
</cookbook/api-deployment>
<cookbook/api-deployment/context_manager_api.py>
from openweights import OpenWeights

ow = OpenWeights()

model = "unsloth/Qwen3-4B"

# async with ow.api.deploy(model) also works
with ow.api.deploy(model):  # async with ow.api.deploy(model) also works
    # entering the context manager is equivalent to api = ow.api.deploy(model) ; api.up()
    completion = ow.chat.completions.create(
        model=model, messages=[{"role": "user", "content": "is 9.11 > 9.9?"}]
    )
    print(
        completion.choices[0].message
    )  # when this context manager exits, it calls api.down()

</cookbook/api-deployment/context_manager_api.py>

<cookbook/api-deployment/gradio_ui.py>
"""Usage:
python gradio_ui.py unsloth/Qwen3-4B
"""

import gradio as gr  # type: ignore

from openweights import OpenWeights  # type: ignore

ow = OpenWeights()


def chat_with(model):
    # You can pass a list of models or lora adapters to ow.api.multi_deploy().
    # Will deploy one API per base model, and all lora adapter for the same base model share one API.
    api = ow.api.multi_deploy([model])[model]
    with api as client:
        gr.load_chat(api.base_url, model=model, token=api.api_key).launch()


if __name__ == "__main__":
    import fire  # type: ignore

    fire.Fire(chat_with)

</cookbook/api-deployment/gradio_ui.py>

<cookbook/custom_job>
README.md
client_side.py
worker_side.py
<cookbook/custom_job/README.md>
# Custom jobs
A custom job lets you run a script that you would normally run on one GPU as a job.

Example:
```python
from openweights import OpenWeights, register, Jobs
ow = OpenWeights()

@register('my_custom_job')
class MyCustomJob(Jobs):
    mount = {
        'local/path/to/script.py': 'script.py',
        'local/path/to/dir/': 'dirname/'
    }
    params: Type[BaseModel] = MyParams  # Your Pydantic model for params
    requires_vram_gb: int = 24
    base_image: str = 'nielsrolf/ow-default' # optional

    def get_entrypoint(self, validated_params: BaseModel) -> str:
        # Get the entrypoint command for the job.
        return f'python script.py {json.dumps(validated_params.model_dump())}'
```

A custom job consists of:
- mounted source files - the code to run a job
- a pydantic model for parameter validation
- the default `requires_vram_gb` - this can be overwritten by passing `ow.my_custom_job.create(requires_vram_gb=60)`
- the docker image to use for the worker - you can build your own images and use them, but the images need to start an openweights worker (see the Dockerfiles in the repo root as reference)
- an entrypoint

It's good to understand what code runs where:
- the initialization of the cusotm job runs on your laptop. It then uploads the mounted source files to openweights
- a worker then downloads the mounted source files into the cwd (a temporary dir) and runs the command returned by `get_entrypoint()`. That means that the `entrypoint` is responsible for passing the parameters to the script.

You can see an example custom job implemented in [client_side.py](client_side.py) and [worker_side.py](worker_side.py).

## Logging
Jobs can log data via `ow.run.log({"foo": "bar"})`. Logs can be retrieved via `events = ow.events.list(run_id=job.runs[-1].id)`

</cookbook/custom_job/README.md>

<cookbook/custom_job/client_side.py>
import json
import os

from pydantic import BaseModel, Field

from openweights import Jobs, OpenWeights, register

ow = OpenWeights()


class AdditionParams(BaseModel):
    """Parameters for our addition job"""

    a: float = Field(..., description="First number to add")
    b: float = Field(..., description="Second number to add")


@register("addition")  # After registering it, we can use it as ow.addition
class AdditionJob(Jobs):
    # Mount our addition script
    mount = {
        os.path.join(os.path.dirname(__file__), "worker_side.py"): "worker_side.py"
    }

    # Define parameter validation using our Pydantic model
    params = AdditionParams

    requires_vram_gb = 0

    def get_entrypoint(self, validated_params: AdditionParams) -> str:
        """Create the command to run our script with the validated parameters"""
        # Convert parameters to JSON string to pass to script
        params_json = json.dumps(validated_params.model_dump())
        return f"python worker_side.py '{params_json}'"


def main():

    # Submit the job with some parameters
    job = ow.addition.create(a=5, b=9)
    print(f"Created job: {job.id}")

    # Optional: wait for job completion and print jobs
    import time

    while True:
        job.refresh()
        if job.status in ["completed", "failed"]:
            break
        print("Waiting for job completion...")
        time.sleep(2)

    if job.status == "completed":
        print(f"Job completed successfully: {job.outputs}")
        # Get the jobs from the events
        events = ow.events.list(job_id=job.id)
        for event in events:
            print(f"Event data: {event['data']}")
    else:
        print(f"Job failed: {job}")


if __name__ == "__main__":
    main()

</cookbook/custom_job/client_side.py>

<cookbook/custom_job/worker_side.py>
import json
import sys

from openweights import OpenWeights

# Get parameters from command line
params = json.loads(sys.argv[1])
a = params["a"]
b = params["b"]

# Calculate sum
result = a + b

# Log the result using the run API
ow = OpenWeights()
ow.run.log({"text": "we can log any dicts"})
ow.run.log({"text": "they can be fetched via ow.events(job_id=job.id)"})
ow.run.log(
    {"text": "you can then access the individual logged items via event['data']"}
)
ow.run.log({"result": result})

print(f"{a} + {b} = {result}")

</cookbook/custom_job/worker_side.py>

</cookbook/custom_job>
</cookbook>

<cookbook/inference>
prompts.jsonl
run_inference.py
</cookbook/inference>

<github.com/nielsrolf>
[missing file]
</github.com/nielsrolf>

# OpenWeights Architecture

## Overview

OpenWeights is a Python SDK for running distributed compute jobs on managed RunPod GPU infrastructure. It provides a simple, OpenAI-like API with full flexibility for custom workloads including fine-tuning, inference, evaluations, and arbitrary Python scripts.

**Key Features:**
- Simple Python SDK with OpenAI-compatible interfaces
- Full flexibility to define custom jobs with arbitrary Docker images and entrypoints
- Automated management of RunPod GPU infrastructure
- Multi-tenancy with organization-based isolation
- Content-addressable job and file IDs for deduplication

## Core Concepts

### What is a Job?

A job is the fundamental unit of work in OpenWeights. It consists of three components:

1. **Docker Image**: The container environment (e.g., `nielsrolf/ow-default`, custom images)
2. **Mounted Files**: Files uploaded to Supabase storage and mounted into the container
3. **Entrypoint**: The command/script to execute (e.g., `python train.py --model=llama`)

Jobs can be:
- **Built-in jobs**: Pre-configured templates for common tasks (fine-tuning with Unsloth, inference with vLLM, Inspect AI evaluations)
- **Custom jobs**: User-defined jobs using the `@register` decorator and `Jobs` base class

### Job Lifecycle States

Jobs progress through the following states:
- `pending`: Job is queued, waiting for a worker
- `in_progress`: Job is currently executing on a worker
- `completed`: Job finished successfully
- `failed`: Job encountered an error
- `canceled`: Job was manually canceled or timed out

### Jobs, Runs, and Events

**Jobs** are reusable templates that define what to execute:
- Identified by content hash of their parameters (e.g., `unsloth-abc123def456`)
- If you submit the same job twice, it uses the existing job (deduplication)
- Contain: docker image, script/entrypoint, parameters, VRAM requirements, hardware constraints

**Runs** are individual executions of a job:
- Each job can have multiple runs (e.g., if restarted after failure)
- Track execution status, assigned worker, and log file
- Created when a worker picks up a job or when using `ow.run` context

**Events** are structured logs/outputs during a run:
- Store arbitrary JSON data (metrics, checkpoints, errors)
- Can reference uploaded files (model checkpoints, outputs)
- Used to track progress and collect results

**Relationship:**
```
Job (1) ──< (many) Runs (1) ──< (many) Events
```

## System Architecture

OpenWeights follows a queue-based architecture with three main components:

### 1. Job Queue (Supabase)

**Database Tables:**
- `jobs`: Job definitions and status
- `runs`: Execution records linking jobs to workers
- `events`: Structured logs and outputs from runs
- `files`: File metadata (actual files stored in Supabase Storage)
- `worker`: Worker registration and health tracking
- `organizations`: Multi-tenant isolation
- `organization_secrets`: API keys and credentials (HF_TOKEN, RUNPOD_API_KEY, etc.)
- `service_account_tokens`: JWT tokens for API authentication

**Key Features:**
- Row Level Security (RLS) ensures organization isolation
- Atomic job acquisition using PostgreSQL functions (`acquire_job`, `update_job_status_if_in_progress`)
- Content-addressable IDs prevent duplicate jobs and files

### 2. Cluster Manager

**Architecture:**
- **Supervisor** (`cluster/supervisor.py`): Top-level process that spawns one manager per organization
- **Organization Manager** (`cluster/org_manager.py`): Manages GPU workers for a single organization

**Responsibilities:**
1. Monitor job queue for pending jobs
2. Provision RunPod workers when jobs arrive
3. Scale workers based on demand (up to MAX_WORKERS per org)
4. Terminate idle workers (idle > 5 minutes)
5. Clean up unresponsive workers (no ping > 2 minutes)
6. Match jobs to hardware based on VRAM requirements and `allowed_hardware` constraints

**Worker Provisioning:**
- Determines GPU type based on job's `requires_vram_gb` and `allowed_hardware`
- Supports multi-GPU configurations (1x, 2x, 4x, 8x GPUs)
- Creates worker record in database with `status='starting'`
- Launches RunPod pod with appropriate Docker image and environment variables
- Updates worker record with `pod_id` when pod is ready

### 3. Workers

**Worker Lifecycle:**
1. **Initialization** (`worker/main.py`):
   - Detects GPU configuration (type, count, VRAM)
   - Runs GPU health checks
   - Registers in database with hardware specs
   - Starts health check background thread

2. **Job Acquisition:**
   - Polls database for pending jobs matching its Docker image
   - Filters by hardware compatibility (VRAM or `allowed_hardware`)
   - Prefers jobs with cached models
   - Uses `acquire_job()` RPC for atomic job claiming

3. **Job Execution:**
   - Downloads mounted files from Supabase Storage
   - Creates temporary directory for job execution
   - Runs job script with `OPENWEIGHTS_RUN_ID` environment variable
   - Streams logs to local file and stdout
   - Monitors for cancellation signals

4. **Result Collection:**
   - Uploads log file to Supabase Storage
   - Uploads files from `/uploads` directory as results
   - Creates events with file references
   - Updates job status atomically

5. **Health Monitoring:**
   - Pings database every 5 seconds
   - Checks for job cancellation or timeout
   - Listens for shutdown signal from cluster manager

6. **Shutdown:**
   - Reverts in-progress jobs to pending (if worker dies)
   - Uploads final logs
   - Terminates RunPod pod

## Authentication & Authorization

### User Authentication Flow

1. **Sign Up**: Users create accounts via Supabase Auth in the dashboard
2. **Organization Creation**: Users create organizations in the dashboard UI
3. **API Key Generation**:
   - Users create API tokens via the CLI: `ow token create --name "my-token"`
   - API tokens are prefixed with `ow_` and stored securely in the `api_tokens` table
   - Tokens can optionally have expiration dates and can be revoked
   - Format: `ow_` followed by a randomly generated secure token

### Authorization Mechanism

**Client-Side:**
```python
ow = OpenWeights(auth_token=os.getenv("OPENWEIGHTS_API_KEY"))
```

The client:
- Accepts an OpenWeights API token (starting with `ow_`)
- Automatically exchanges the API token for a short-lived JWT using `exchange_api_token_for_jwt()` RPC
- Passes the JWT in the `Authorization` header to Supabase
- Extracts organization ID from the JWT using `get_organization_from_token()` RPC
- Supports backwards compatibility: if the token is already a JWT (doesn't start with `ow_`), it uses it directly

**Database-Side:**
- Supabase Row Level Security (RLS) policies automatically filter queries
- Policies check `organization_id` column against the authenticated token's org
- Ensures users can only access their organization's jobs, runs, events, files, workers

**Key RLS Policies:**
- Jobs: Can only query/insert/update jobs where `organization_id` matches token
- Files: Can only access files stored under `organizations/{org_id}/` path
- Workers: Can only view workers belonging to their organization
- Events/Runs: Accessible through their parent job's organization

### Worker Authentication

Workers can operate in two modes:

1. **User-Provided Token**: Uses the organization's service account token from environment
2. **Auto-Generated Token**: Worker creates its own service account token at startup using `create_service_account_token()` RPC

Both approaches leverage RLS to ensure workers can only access their organization's data.

## Client SDK (`openweights/client/`)

### Main Components

**`OpenWeights` class** (`__init__.py`):
- Entry point for SDK
- Initializes Supabase client with auth token
- Provides accessors for jobs, runs, events, files, chat
- Supports custom job registration via `@register` decorator

**`Jobs` class** (`jobs.py`):
- Base class for job definitions
- Handles file uploads and mounting
- Computes content-addressable job IDs
- Implements `get_or_create_or_reset()` for job deduplication

**`Run` class** (`run.py`):
- Represents a single job execution
- Created automatically when jobs execute
- Provides logging and file upload from within jobs
- Can be used standalone for script-based jobs

**`Files` class** (`files.py`):
- Content-addressable file storage
- Format: `{purpose}:file-{hash[:12]}`
- Validates conversation/preference datasets
- Handles organization-specific storage paths

**`Events` class** (`events.py`):
- Structured logging for runs
- Supports file attachments
- Provides `latest()` to extract most recent metric values

## Built-in Jobs

### Fine-Tuning (`openweights/jobs/unsloth/`)

**Jobs:**
- SFT (Supervised Fine-Tuning)
- DPO (Direct Preference Optimization)
- ORPO (Odds Ratio Preference Optimization)
- Weighted SFT (token-level loss weighting)

**Features:**
- Built on Unsloth for memory-efficient training
- Automatic model upload to Hugging Face
- Support for LoRA/QLoRA
- Checkpoint tracking via events
- Log probability tracking

### Inference (`openweights/jobs/inference/`)

**Backend:** vLLM

**Features:**
- Batch inference on JSONL datasets
- OpenAI-compatible API endpoints
- Support for conversation and text completion formats
- Automatic result file upload

### Evaluation (`openweights/jobs/inspect_ai.py`)

**Backend:** Inspect AI framework

**Features:**
- Run evaluations from the Inspect AI library
- Automatic result download
- Flexible eval options pass-through

### Custom Jobs

Users can define custom jobs:

```python
from openweights import OpenWeights, register, Jobs
from pydantic import BaseModel

@register('my_job')
class MyCustomJob(Jobs):
    mount = {'local/script.py': 'script.py'}
    params = MyParamsModel  # Pydantic model
    requires_vram_gb = 24
    base_image = 'nielsrolf/ow-default'

    def get_entrypoint(self, params):
        return f'python script.py --arg={params.arg}'
```

## Default Jobs Directory

The `openweights/jobs/` directory contains several built-in job implementations:
- `unsloth/`: Fine-tuning jobs
- `weighted_sft/`: Token-weighted SFT
- `inference/`: vLLM inference
- `vllm/`: vLLM configuration
- `inspect_ai.py`: Inspect AI evaluations
- `mmlu_pro/`: MMLU Pro evaluation

**Important:** These are simply convenient job definitions included in the repository. There is nothing architecturally special about them—they could just as easily live in external repositories or be defined by users in their own codebases.

## Dashboard (`openweights/dashboard/`)

**Backend** (`backend/main.py`): FastAPI service
- REST API for job/run/worker management
- Proxies Supabase with additional business logic
- Token management endpoints
- File content serving

**Frontend** (`frontend/src/`): React + TypeScript
- Job/run/worker list and detail views
- Real-time log streaming
- Metrics visualization
- Organization management
- Token creation and management

## Storage Architecture

**Supabase Storage** (`files` bucket):
- Organization-scoped paths: `organizations/{org_id}/{file_id}`
- Files are content-addressed with purpose prefix: `{purpose}:file-{hash[:12]}`
- RLS policies enforce organization boundaries

**File Types:**
- `conversations`: Training datasets (validated JSONL)
- `preference`: Preference datasets for DPO/ORPO
- `result`: Job outputs (model checkpoints, predictions)
- `log`: Execution logs
- `custom_job_file`: Mounted files for custom jobs

## Hardware Management

**GPU Selection:**
- Jobs specify `requires_vram_gb` (default: 24)
- Optionally specify `allowed_hardware` list (e.g., `["2x A100", "4x H100"]`)
- Cluster manager determines GPU type and count from `HARDWARE_CONFIG` mapping
- Workers register their exact hardware type (e.g., "2x L40")

**Supported GPUs:**
- NVIDIA L40, A100, A100S, H100N, H100S, H200
- Multi-GPU: 1x, 2x, 4x, 8x configurations
- Configurable in `cluster/start_runpod.py`

**Worker Matching:**
- Workers filter jobs by Docker image first
- Then by hardware compatibility (VRAM or `allowed_hardware` match)
- Prefer jobs with cached models

## Fault Tolerance

**Job Atomicity:**
- `acquire_job()`: Atomically transitions job from pending → in_progress
- `update_job_status_if_in_progress()`: Only updates if still assigned to worker
- Prevents race conditions when multiple workers or managers interact

**Worker Failure Handling:**
1. **Unresponsive Workers** (no ping > 2 min):
   - Cluster manager reverts their in-progress jobs to pending
   - Terminates RunPod pod
   - Marks worker as terminated

2. **Worker Crashes**:
   - `atexit` handler attempts to revert jobs to pending
   - Cluster manager's health check catches missed cases

3. **Repeated Failures**:
   - Workers track last 5 job outcomes
   - Self-terminate if all 5 failed (likely bad worker)

## Content Addressing

**Job IDs:**
```python
job_id = f"{job_type}-{sha256(params + org_id).hex()[:12]}"
```
- Deterministic based on parameters and organization
- Resubmitting identical job returns existing job
- Optional suffix for manual job variants

**File IDs:**
```python
file_id = f"{purpose}:file-{sha256(content + org_id).hex()[:12]}"
```
- Automatic deduplication within organization
- Content changes = new file ID

## Scaling & Performance

**Horizontal Scaling:**
- One organization manager per organization
- Managers provision workers dynamically
- Workers execute jobs concurrently

**Cost Optimization:**
- Idle workers terminated after 5 minutes
- Content addressing prevents redundant work
- Workers prefer cached models to reduce download time

**Limits:**
- `MAX_WORKERS_PER_ORG`: Default 8 (configurable per org)
- Worker TTL: 24 hours (configurable, extendable from within pod)

## Monitoring & Observability

**Worker Health:**
- Ping every 5 seconds
- GPU health checks at startup
- Log aggregation via Supabase Storage

**Job Progress:**
- Events table for structured logging
- Real-time log streaming in dashboard
- Metrics visualization (loss curves, accuracy, etc.)

**System State:**
- Database tables provide complete audit trail
- Worker status: starting, active, shutdown, terminated
- Job status: pending, in_progress, completed, failed, canceled
