# Flow SDK

Flow SDK is a Python library for seamless GPU compute orchestration across cloud providers. It provides a unified interface for submitting computational workloads, managing distributed training, and handling infrastructure complexity.

## Quick Start: Interactive GPU Development

Launch GPU instances for interactive development without config files:

```bash
# Quick launch with instance type only
flow run --instance-type h100            # Single H100
flow run -i 8xa100                       # 8x A100 cluster
flow run -i 4xa100                       # 4x A100 cluster
flow run -i a100 --name jupyter-dev     # Named instance

# With SSH keys (uses platform keys or local paths)
flow run -i h100 --ssh-keys ~/.ssh/mykey
flow run -i h100 --ssh-keys platform-key-name

# Custom Docker image
flow run -i a100 --image pytorch/pytorch:2.0.0-cuda11.8

# Then SSH in when ready
flow ssh <task-id>
```

Instance types accept flexible formats:
- Simple: `a100`, `h100`
- Count prefix: `2xa100`, `4xa100`, `8xa100`, `8xh100`
- Full spec: `a100-80gb.sxm.8x`, `h100-80gb.pcie.1x`

## Core Architecture

The SDK follows a layered architecture with clear separation of concerns:

### 1. User Interface Layer (`flow/`)
- **api/client.py**: Main Flow class providing high-level API (run, status, cancel)
- **api/models.py**: Pydantic models for TaskConfig, VolumeSpec, InstanceType, TaskStatus, User, Instance
- **api/invoke.py**: Zero-import remote execution functionality
- **api/decorators.py**: Modal-compatible decorator interface (@app, @function)
- **core/interfaces.py**: Protocol definitions (IProvider, IComputeProvider, IStorageProvider)

### 2. Frontend Adapters (`flow/_internal/frontends/`)
Multiple input formats converging to unified TaskConfig:
- **yaml/**: YAML configuration files → TaskConfig
- **slurm/**: SLURM sbatch scripts → TaskConfig (includes parser.py, converter.py)
- **submitit/**: Facebook Submitit compatibility → TaskConfig
- **cli/**: Command-line interface → TaskConfig
- **registry.py**: Frontend registration and discovery

### 3. Core Domain (`flow/core/`)
- **task_engine.py**: Task lifecycle management
- **engine.py**: Core workflow engine
- **code_packager.py**: Code packaging for remote execution
- **resources/**: Resource parsing and matching
- **interfaces.py**: Protocol definitions
(Note: Instance parsing moved to utils/, catalog not implemented)

### 4. Provider Abstraction (`flow/providers/`)
- **base.py**: IProvider interface defining provider contract
- **factory.py**: Provider creation using registry pattern
- **registry.py**: Provider registration and discovery

#### Mithril Provider (`flow/providers/mithril/`)
Complete implementation for Mithril (Mithril) cloud:
- **provider.py**: Main provider implementation (includes _package_local_code method)
- **init.py**: Provider initialization interface
- **adapters/**: Model and mount adapters for Mithril API
- **api/**: API handlers and types
  - **client.py**: Mithril API client
  - **types.py**: Mithril-specific type definitions
- **bidding/**: Auction and bid management
  - **finder.py**: Spot instance auction discovery
  - **manager.py**: Bid lifecycle management
- **core/**: Core Mithril logic
  - **constants.py**: Mithril constants and configuration
  - **errors.py**: Mithril-specific error handling
- **resources/**: Resource management
  - **gpus.py**: GPU instance handling
  - **instances.py**: Instance management
  - **projects.py**: Project operations
  - **ssh.py**: SSH key management
- **runtime/**: Runtime components
  - **startup/**: Startup script generation
    - **builder.py**: Script builder (10KB limit)
    - **sections.py**: Modular script sections including CodeUploadSection

**Mithril Environment**: Ubuntu 22.04, bash shell, 10KB script limit, first-boot execution only

**Code Upload Implementation**:
- `_package_local_code()` in provider.py creates gzipped tar archive
- Archive passed via environment variable `_FLOW_CODE_ARCHIVE`
- `CodeUploadSection` in script_sections.py extracts to /workspace
- Docker container working directory set to /workspace when upload_code=True

### 5. Supporting Services
- **_internal/**: Internal components
  - **data/**: Data loading and S3/URL resolution
  - **storage/**: Volume and persistent storage management
  - **integrations/**: External service integrations
    - **google_colab/**: Google Colab runtime connection
    - **jupyter/**: Jupyter notebook server support
  - **io/**: HTTP and networking utilities
  - **managers/**: Task and resource management
  - **init/**: Initialization and configuration
    - **writer.py**: AWS-style secure credential storage
  - **config_loader.py**: Multi-source configuration loading
  - **auth.py**: Authentication utilities
- **utils/**: General utilities
  - **retry.py**: Retry logic with exponential backoff
  - **validation.py**: Input validation
  - **security.py**: Security utilities
  - **instance_parser.py**: Instance type parsing
  - **instance_validator.py**: Instance validation
- **cli/**: Command-line interface
  - **config_wizard.py**: Interactive configuration setup
  - **formatters/**: Output formatting
  - **migrate.py**: Migration utilities

## Key Concepts

### Instance Type Resolution
The SDK supports multiple instance type formats:

**Supported formats at Flow SDK level:**
1. **Simple name**: `"a100"`, `"h100"` (defaults to single GPU)
2. **Count prefix**: `"2xa100"`, `"4xa100"`, `"8xa100"`, `"8xh100"`
3. **Mithril format**: `"a100-80gb.sxm.1x"`, `"h100-80gb.pcie.1x"` (exact match)
4. **Direct FID**: `"it_MsIRhxj3ccyVWGfP"` (if you know the FID)

**NOT supported** (will raise error):
- `"a100x8"` or `"a100*8"` style syntax
- `"a100-80gb"` without full specification
- `"gpu.nvidia.a100"` style prefixes
- Case variations like `"A100"` or `"H100"`
- Vendor prefixes like `"nvidia-a100"`

**Resolution mapping (as implemented):**
```python
# From provider.py line 1408
"a100" → "it_MsIRhxj3ccyVWGfP"  # 1x A100
"1xa100" → "it_MsIRhxj3ccyVWGfP"
"2xa100" → "it_5M6aGxGovNeX5ltT"
"4xa100" → "it_fK7Cx6TVhOK5ZfXT"
"8xa100" → "it_J7OyNf9idfImLIFo"
"a100-80gb.sxm.1x" → "it_MsIRhxj3ccyVWGfP"  # Mithril format also works
"h100" → "it_5ECSoHQjLBzrp5YM"  # Defaults to 8x
"8xh100" → "it_5ECSoHQjLBzrp5YM"
"h100-80gb.pcie.1x" → "it_XqgKWbhZ5gznAYsG"  # Different FID for PCIe
```

### Task Lifecycle
1. **Configuration**: User provides TaskConfig (directly or via frontend)
2. **Resolution**: Find suitable instances based on requirements
3. **Submission**: Create cloud resources and start execution
4. **Monitoring**: Track status, stream logs, handle failures
5. **Cleanup**: Terminate resources, persist outputs

### Multi-Provider Design
- Providers implement IProvider interface
- Flow class delegates to appropriate provider
- Provider selection based on API endpoint
- Extensible to AWS, GCP, Azure, Lambda Labs

## API Surface

### Primary API (api/client.py)
```python
flow = Flow()
task = flow.run(config)  # Submit task
status = task.status     # Check status (property, not method)
logs = task.logs()      # Get logs
task.stop()            # Cancel/terminate

# Additional methods:
projects = flow.list_projects()  # List available projects
ssh_keys = flow.list_ssh_keys()  # List SSH keys
flow.close()  # Explicit cleanup

# Context manager support:
with Flow() as flow:
    task = flow.run(config)
```

### Convenience Functions
```python
# Note: All operations require Flow instance - no module-level functions
from flow import Flow

flow = Flow()

# Submit a task - automatically uploads local files
task = flow.run("python train.py", instance_type="a100")
task = flow.run(config)  # With TaskConfig
task = flow.run("job.yaml")  # From YAML file

# Code upload behavior (new in v2)
task = flow.run("python train.py", instance_type="a100")  # upload_code=True by default
task = flow.run("python /app/train.py", instance_type="a100", 
                image="myapp:latest", upload_code=False)  # Use pre-built image

# Check status
status = task.status  # Property - Returns: "pending"|"running"|"completed"|"failed"|"cancelled"
# Or for existing task:
status = flow.status(task_id)  # Get status by ID

# Cancel task
task.cancel()
# Or for existing task:
flow.cancel(task_id)  # Cancel by ID

# Get task object by ID
task = flow.get_task(task_id)  # Returns Task object with all methods
user = task.get_user()  # Get user who created task
instances = task.get_instances()  # Get instance details with IPs
```

### Data Mounting (flow.submit)
```python
# Mount data from S3 or volumes
task = flow.submit(
    "python train.py",
    gpu="a100",
    data="s3://bucket/dataset"  # Mounts at /data
)

# Multiple data sources
task = flow.submit(
    "python train.py",
    gpu="a100:4",
    data={
        "/datasets": "s3://ml-bucket/imagenet",
        "/models": "volume://pretrained-models",  # Auto-creates if missing
        "/cache": "volume://build-cache"
    }
)
```

### Configuration (models.py)

#### TaskConfig Fields (Common Mistakes)
```python
from flow.models import TaskConfig

config = TaskConfig(
    # Basic configuration
    name="gpu-training",                          # Required, alphanumeric + dash/underscore
    
    # Instance specification (exactly ONE of these)
    instance_type="a100",                         # "a100", "2xa100", "8xh100", etc.
    # OR
    min_gpu_memory_gb=80,                         # For capability-based selection
    
    # Command specification
    command=["python", "train.py", "--epochs", "10"],  # List[str] or str
    # Note: 'shell' and 'script' fields are not yet implemented
    
    # Environment - COMMON MISTAKE!
    env={"CUDA_VISIBLE_DEVICES": "0"},           # NOT "environment"!
    
    # SSH access - note plural!
    ssh_keys=["key-123"],                        # NOT "ssh_key" - always plural!
    
    # Resources
    volumes=[VolumeSpec(size_gb=100, mount_path="/data")],
    
    # Pricing and limits
    max_price_per_hour=10.0,                     # NOT "max_price" or "price"!
    max_run_time_hours=24.0,                     # Max 168 (7 days)
    
    # Multi-node
    num_instances=1,                             # NOT "instance_count"!
    
    # Other
    image="nvidia/cuda:12.1.0-runtime-ubuntu22.04",
    working_dir="/workspace",
    region="us-central1-b",
    priority=50,
    
    # Code upload
    upload_code=True,                            # Default: uploads current directory
    
    # Additional fields not in original docs:
    min_run_time_hours=1.0,                      # Minimum guaranteed runtime
    deadline_hours=24.0,                         # Hours until deadline
    mounts=[],                                   # Data mounts configuration
)
```

**Field validation rules:**
- Must specify `command` (currently only field implemented)
- Must specify exactly ONE of: `instance_type`, `min_gpu_memory_gb`
- Cannot specify both (will raise ValidationError)
- Note: `shell` and `script` fields planned but not yet implemented

### Code Upload Feature (new in v2)

Flow SDK automatically uploads your local code to GPU instances by default:

```python
# This works out of the box - train.py from your current directory
task = flow.run("python train.py", instance_type="a100")

# Behind the scenes:
# 1. Creates tar.gz archive of current directory
# 2. Excludes common dev files (.git, __pycache__, etc.)
# 3. Respects .flowignore patterns
# 4. Embeds in startup script (base64 encoded)
# 5. Extracts to /workspace on instance
# 6. Sets working directory to /workspace

# Control upload behavior
task = flow.run("python train.py", upload_code=False)  # Disable upload

# Handle dependencies (not automatic)
task = flow.run("pip install -r requirements.txt && python train.py")
task = flow.run("uv pip install . && uv run python train.py")  # Faster with uv
```

**Upload Constraints:**
- 10MB limit after gzip compression
- Use .flowignore to exclude files (same syntax as .gitignore)
- Working directory set to /workspace when upload_code=True
- Dependencies must be installed explicitly

**For larger projects:**
1. Use .flowignore to exclude unnecessary files
2. Clone from Git: `flow.run("git clone https://github.com/org/repo . && python train.py", upload_code=False)`
3. Pre-built Docker images: `flow.run("python /app/train.py", image="myapp:latest", upload_code=False)`
4. Download from S3: `flow.run("aws s3 cp s3://bucket/code.tar.gz . && tar -xzf code.tar.gz && python train.py", upload_code=False)`

### Zero-Import Execution (api/invoke.py)
```python
result = invoke("script.py", "train_model", 
               args=["dataset.csv"], 
               gpu="a100")
```

## Decorator API (api/decorators.py)
Modal-compatible decorator interface for remote execution:

```python
from flow import app

# Define a Flow app
app = app()

# Decorate functions for remote execution
@app.function(gpu="a100", image="pytorch/pytorch:latest")
def train_model(dataset_path: str, epochs: int = 10):
    import torch
    # Training code here
    return {"accuracy": 0.95}

# Execute remotely
result = train_model.remote("data/train.csv", epochs=20)
# Or spawn asynchronously
future = train_model.spawn("data/train.csv")
result = future.result()

# Local execution also supported
result = train_model.local("data/train.csv")
```

## Instance Type System

The instance resolution system has multiple layers:

1. **User Input** → Various formats accepted
2. **Parsing** → InstanceParser extracts components  
3. **Canonicalization** → Convert to standard format
4. **Resolution** → Find matching cloud instance
5. **Validation** → Ensure availability and price limits

### Current Issue
Mithril API returns format `a100-80gb.sxm.2x` which the parser doesn't recognize. Need to add regex pattern for this format.

## Testing Infrastructure

### Unit Tests (`tests/unit/`)
- Model validation
- Parser correctness
- Provider interfaces

### Integration Tests (`tests/integration/`)
- Real API calls with test credentials
- Multi-component workflows
- Error handling

### E2E Tests (`tests/e2e/`)
- Full workflow validation
- Real infrastructure provisioning
- Cost-controlled testing

### Examples (`examples/`)
- 01_verify_instance.py: Basic GPU verification
- 02_jupyter_server.py: Interactive notebook server
- 03_multi_node_training.py: Distributed training
- 04_s3_data_access.py: Cloud storage integration

## Detailed Model Reference

### Task Model
```python
from flow.models import Task, TaskStatus
from datetime import datetime
from typing import Dict, List, Optional, Union, Iterator

# Task - Returned by flow.run(), represents a running/completed task
task = Task(
    # Identity:
    task_id="task-abc123",               # str: Unique identifier
    name="training-job",                  # str: Human-readable name
    status=TaskStatus.RUNNING,            # TaskStatus enum
    config=config,                        # Optional[TaskConfig]: Original config
    
    # Timestamps:
    created_at=datetime.utcnow(),         # datetime: Creation time
    started_at=datetime.utcnow(),         # Optional[datetime]: Start time
    completed_at=None,                    # Optional[datetime]: Completion time
    
    # Resources:
    instance_type="gpu.nvidia.a100",      # str: Instance type used
    num_instances=1,                      # int: Number of instances
    region="us-west-2",                   # str: Region
    
    # Cost:
    cost_per_hour="$25.60",               # str: Hourly cost
    total_cost="$12.80",                  # Optional[str]: Total cost so far
    
    # SSH Access:
    ssh_host="54.123.45.67",              # Optional[str]: SSH hostname/IP
    ssh_port=22,                          # int: SSH port
    ssh_user="ubuntu",                    # str: SSH username
    ssh_command="ssh ubuntu@54.123.45.67", # Optional[str]: Full SSH command
    
    # Runtime info:
    endpoints={"jupyter": "http://..."},   # Dict[str, str]: Service URLs
    instances=["i-abc123"],               # List[str]: Instance IDs
    message="Task is running",            # Optional[str]: Status message
    
    # User info:
    created_by="user-123"                 # Optional[str]: User ID who created task
)

# Task methods:
task.is_running           # Property: bool - True if RUNNING
task.is_terminal          # Property: bool - True if COMPLETED/FAILED/CANCELLED

# Get logs (returns str or Iterator[str]):
logs = task.logs()                       # Get last 100 lines
logs = task.logs(tail=1000)              # Get last 1000 lines
for line in task.logs(follow=True):      # Stream logs in real-time
    print(line)

# Wait for completion:
task.wait()                              # Wait indefinitely
task.wait(timeout=3600)                  # Wait up to 1 hour

# Update status:
task.refresh()                           # Refresh from provider

# Stop/cancel:
task.stop()                              # Stop the task
task.cancel()                            # Alias for stop()

# SSH access:
task.ssh()                               # Interactive SSH session
task.ssh(command="nvidia-smi")           # Run command remotely
task.ssh(node=1)                         # SSH to specific node (multi-node)

# Get user information:
user = task.get_user()                   # Get User object for task creator
print(user.username)                     # Username
print(user.email)                        # Email address

# Get instance details:
instances = task.get_instances()         # Get List[Instance] with full details
for inst in instances:
    print(inst.instance_id)              # Instance ID
    print(inst.public_ip)                # Public IP address
    print(inst.private_ip)               # Private IP address
    print(inst.status)                   # Instance status
```

### TaskStatus Enum
```python
from flow.models import TaskStatus
from enum import Enum

class TaskStatus(str, Enum):
    PENDING = "pending"       # Submitted but not started
    RUNNING = "running"       # Actively executing
    COMPLETED = "completed"   # Finished successfully (exit 0)
    FAILED = "failed"         # Finished with error (non-zero exit)
    CANCELLED = "cancelled"   # Terminated by user

# Usage:
if task.status == TaskStatus.RUNNING:
    print("Task is still running")

if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
    print("Task finished")
```

### User Model
```python
from flow.models import User

# User - Task creator information
user = User(
    user_id="user-123",                   # str: User ID
    username="alice",                     # str: Username
    email="alice@example.com"             # str: Email address
)

# Returned by:
user = task.get_user()                    # Get user who created the task
```

### Instance Model
```python
from flow.models import Instance

# Instance - Detailed instance information
instance = Instance(
    instance_id="inst-abc123",            # str: Instance ID
    task_id="task-xyz789",                # str: Associated task ID
    status="running",                     # str: Instance status
    ssh_host="54.123.45.67",             # Optional[str]: SSH hostname
    private_ip="10.0.1.23",              # Optional[str]: Private IP
    created_at=datetime.utcnow(),        # datetime: Creation time
    terminated_at=None                    # Optional[datetime]: Termination time
)

# Returned by:
instances = task.get_instances()          # Get all instances for a task
```

### Volume and Mount Models
```python
from flow.models import Volume, MountSpec, StorageInterface
from datetime import datetime

# Volume - Persistent storage volume
volume = Volume(
    volume_id="vol-abc123",               # str: Unique identifier
    name="training-data",                 # str: Volume name
    size_gb=500,                         # int: Size in GB
    region="us-west-2",                   # str: Region
    interface=StorageInterface.BLOCK,     # StorageInterface enum
    created_at=datetime.utcnow(),         # datetime: Creation time
    attached_to=["i-123", "i-456"]       # List[str]: Attached instances
)

# MountSpec - Data mount specification
mount = MountSpec(
    source="s3://bucket/data",            # str: Source URL or path
    target="/mnt/data",                   # str: Container mount path
    mount_type="s3fs",                    # Literal["bind", "volume", "s3fs"]
    options={"readonly": True},           # Dict[str, Any]: Mount options
    cache_key="data-v1",                  # Optional[str]: Cache key
    size_estimate_gb=100.0                # Optional[float]: Size estimate
)

# StorageInterface enum:
class StorageInterface(str, Enum):
    BLOCK = "block"           # Block storage (EBS-like)
    FILE = "file"             # File storage (EFS-like)
```

## CLI Commands

The CLI (`flow/cli/`) provides:
- `flow init`: Initial setup and configuration with interactive wizard
- `flow run`: Submit tasks or launch interactive instances
  - `flow run config.yaml`: Submit task from YAML config
  - `flow run --instance-type h100`: Launch interactive GPU instance
  - `flow run -i 8xa100 --name dev`: Launch named multi-GPU instance
  - `flow run -i a100 --ssh-keys key-name`: Specify SSH keys
- `flow status`: List tasks (default: last 24h + running/pending tasks)
- `flow logs <task_id|name>`: Stream task logs (accepts task ID or name)
- `flow ssh <task_id|name>`: SSH into running instance
- `flow cancel <task_id|name>`: Cancel running task (accepts task ID or name)
- `flow catalog`: Browse available instances
- `flow example [name]`: Show example configurations (minimal, gpu-test, system-info, training)
- `flow volumes`: Volume management
  - `flow volumes list`: List all volumes
  - `flow volumes create --size SIZE [--name NAME]`: Create volume
  - `flow volumes delete VOLUME_ID`: Delete specific volume
  - `flow volumes delete-all [--pattern PATTERN] [--dry-run]`: Bulk delete

## Error Handling

### Exception Hierarchy
```python
from flow.errors import (
    FlowError,              # Base exception
    AuthenticationError,    # Invalid API key
    ResourceNotFoundError,  # Resource not found
    TaskNotFoundError,      # Task not found (subclass of ResourceNotFoundError)
    ValidationError,        # Invalid configuration
    APIError,               # API request failed
    ValidationAPIError,     # API validation error (422)
    NetworkError,           # Network communication failed
    TimeoutError,           # Request timed out
    ProviderError,          # Provider-specific error
)

# FlowError structure:
try:
    task = flow.run(config)
except FlowError as e:
    print(e.message)        # Error message
    print(e.suggestions)    # List of suggestions
    print(e.error_code)     # Optional error code

# ValidationAPIError (422 responses):
try:
    task = flow.run(config)
except ValidationAPIError as e:
    print(e.status_code)    # 422
    print(e.validation_errors)  # List of field errors
    # Formatted message includes field-specific help

# APIError:
try:
    task = flow.run(config)
except APIError as e:
    print(e.status_code)    # HTTP status code
    print(e.response_body)  # Raw response body
```

## Advanced Features

### Google Colab Integration
Connect Google Colab to Flow GPU instances:

```python
from flow._internal.integrations.google_colab import GoogleColabIntegration

# In your Flow task:
integration = GoogleColabIntegration()
integration.connect()  # Sets up Jupyter server with WebSocket support
```

### Jupyter Integration
Native Jupyter notebook server support with persistent kernels:

```python
from flow._internal.integrations.jupyter import JupyterIntegration

jupyter = JupyterIntegration()
jupyter.start_server()  # Launches Jupyter on Flow instance
```

### Configuration Wizard
Interactive setup for new users:

```bash
flow init  # Launches interactive configuration wizard
```

The wizard guides through:
- API key setup
- Project selection
- Provider configuration
- SSH key generation

### Volume Management CLI
Comprehensive volume management:

```bash
# List all volumes
flow volumes list

# Create a volume
flow volumes create --size 100 --name my-data

# Delete specific volume
flow volumes delete vol-abc123

# Bulk delete with pattern matching
flow volumes delete-all --pattern "temp-*" --dry-run
flow volumes delete-all --pattern "temp-*"  # Actually delete
```

### SSH Access
Direct SSH access through Task objects:

```python
# Interactive SSH session
task.ssh()

# Run command remotely
output = task.ssh(command="nvidia-smi")

# Multi-node support
task.ssh(node=1)  # SSH to specific node
```

## Configuration System

### FlowConfig Model
```python
from flow.models import FlowConfig

# SDK configuration
config = FlowConfig(
    api_key="mithril-...",                    # str: API key (required)
    project="my-project",                 # str: Project name (required)
    region="us-central1-b",               # str: Default region
    api_url="https://api.mithril.ai"   # str: API endpoint
)
```

### Configuration Hierarchy
Configuration precedence (highest to lowest):
1. Command-line arguments
2. Environment variables (FLOW_* and Mithril_*)
3. Local .flow/config.yaml
4. Global ~/.flow/config.yaml
5. Provider-specific credentials (~/.flow/credentials.{provider})
6. Default values

### Credential Storage
Flow uses AWS-style secure credential storage:

```bash
# Config file (non-sensitive)
~/.flow/config.yaml

# Provider credentials (sensitive, chmod 0600)
~/.flow/credentials.mithril
~/.flow/credentials.aws  # Future
```

Credentials are stored separately from configuration for security.

### Environment Variables
```bash
export FLOW_API_KEY="mithril-..."
export FLOW_PROJECT="my-project"
export FLOW_REGION="us-west-2"
export FLOW_API_URL="https://api.mithril.ai"
```

## Security Model

- API keys stored securely (never in code)
- SSH keys auto-generated per project
- Temporary credentials for cloud storage
- Network isolation between tasks
- No sudo access in containers

## Provider Interface (IProvider Protocol)

```python
from flow.core.provider_interfaces import IProvider, IComputeProvider, IStorageProvider
from flow.providers.interfaces import IProviderInit
from flow.models import Task, TaskConfig, TaskStatus, Instance, Volume
from typing import List, Dict, Any, Optional, Iterator

# IComputeProvider methods:
class IComputeProvider(Protocol):
    def find_instances(
        self,
        requirements: Dict[str, Any],  # instance_type, min_gpu_count, max_price, etc.
        limit: int = 10
    ) -> List[Instance]:
        """Find available instances matching requirements."""
        ...
    
    def submit_task(
        self,
        instance_id: str,
        config: TaskConfig,
        volume_ids: Optional[List[str]] = None
    ) -> Task:
        """Submit task to instance."""
        ...
    
    def get_task(self, task_id: str) -> Task:
        """Get full Task object."""
        ...
    
    def get_task_status(self, task_id: str) -> TaskStatus:
        """Get task status (lightweight)."""
        ...
    
    def stop_task(self, task_id: str) -> bool:
        """Stop a running task."""
        ...
    
    def get_task_logs(
        self,
        task_id: str,
        tail: int = 100,
        log_type: str = "stdout"  # "stdout" or "stderr"
    ) -> str:
        """Get last N lines of logs."""
        ...
    
    def stream_task_logs(
        self,
        task_id: str,
        log_type: str = "stdout"
    ) -> Iterator[str]:
        """Stream logs in real-time."""
        ...
    
    def list_tasks(
        self,
        status: Optional[TaskStatus] = None,
        limit: int = 100
    ) -> List[Task]:
        """List tasks with optional filter."""
        ...
    
    def prepare_task_config(self, config: TaskConfig) -> TaskConfig:
        """Add provider-specific defaults."""
        ...

# IStorageProvider methods:
class IStorageProvider(Protocol):
    def create_volume(
        self,
        size_gb: int,
        name: Optional[str] = None
    ) -> Volume:
        """Create a new volume."""
        ...
    
    def delete_volume(self, volume_id: str) -> bool:
        """Delete a volume."""
        ...
    
    def list_volumes(self, limit: int = 100) -> List[Volume]:
        """List all volumes."""
        ...
    
    def is_volume_id(self, identifier: str) -> bool:
        """Check if string is volume ID vs name."""
        ...

# IProvider combines both:
class IProvider(IComputeProvider, IStorageProvider):
    def get_capabilities(self) -> ProviderCapabilities:
        """Get provider capabilities."""
        ...

# IProviderInit for provider setup:
class IProviderInit(Protocol):
    def list_projects(self) -> List[Project]:
        """List available projects."""
        ...
    
    def list_ssh_keys(self, project_id: Optional[str] = None) -> List[SSHKey]:
        """List SSH keys."""
        ...
    
    def get_config_fields(self) -> List[ConfigField]:
        """Get provider-specific config fields."""
        ...
```

## Common Patterns and Best Practices

### 1. Command Specification
```python
# CORRECT - Use list for command:
config = TaskConfig(
    command=["python", "train.py", "--epochs", "10"],
    ...
)

# CORRECT - Shell commands:
config = TaskConfig(
    shell="cd /app && python train.py --epochs 10",
    ...
)

# CORRECT - Script content:
config = TaskConfig(
    script="""
import torch
model = torch.nn.Linear(10, 1)
print('Model created')
""",
    ...
)

# WRONG - String command (will be wrapped in list):
config = TaskConfig(
    command="python train.py",  # Becomes ["python train.py"]
    ...
)
```

### 2. Volume Specification
```python
# Create new volume:
volume = VolumeSpec(
    size_gb=100,
    mount_path="/data",
    name="training-data",
    iops=3000  # Optional performance tuning
)

# Attach existing volume:
volume = VolumeSpec(
    volume_id="vol-abc123",
    mount_path="/data"
    # Cannot specify size_gb or performance options
)

# Docker cache optimization:
cache_volume = VolumeSpec(
    size_gb=50,
    mount_path="/var/lib/docker",
    name="docker-cache"
)

# IMPORTANT: Volumes are empty by default!
# To use a volume for code, you must populate it first:
# Step 1: Create and populate volume
flow.run("git clone https://github.com/org/repo /code", 
         volumes=[{"name": "my-code", "mount_path": "/code"}])
# Step 2: Use pre-populated volume
flow.run("python /code/train.py", 
         volumes=[{"name": "my-code", "mount_path": "/code"}],
         upload_code=False)
```

### 3. Instance Selection
```python
# Specific instance type:
config = TaskConfig(instance_type="gpu.nvidia.a100", ...)

# Capability-based:
config = TaskConfig(
    min_gpu_memory_gb=80,
    max_price_per_hour=20.0,
    ...
)

# WRONG - Cannot specify both:
config = TaskConfig(
    instance_type="gpu.nvidia.a100",
    min_gpu_memory_gb=80,  # Will raise ValidationError
    ...
)
```

### 4. Error Handling
```python
import logging
from flow.errors import FlowError, TaskNotFoundError, ValidationError

try:
    task = flow.run(config)
    task.wait(timeout=3600)
except ValidationError as e:
    # Configuration was invalid
    logging.error(f"Invalid config: {e.message}")
    for suggestion in e.suggestions:
        logging.info(f"Try: {suggestion}")
except TaskNotFoundError:
    # Task disappeared (rare)
    logging.error("Task was terminated externally")
except FlowError as e:
    # General Flow error
    logging.error(f"Flow error: {e}")
except Exception as e:
    # Unexpected error
    logging.exception("Unexpected error")
```

### 5. Type Annotations
```python
from flow import Flow
from flow.models import Task, TaskConfig, TaskStatus, VolumeSpec
from typing import List, Optional, Iterator

def submit_training_job(
    dataset_path: str,
    epochs: int = 10,
    gpu_type: str = "a100"
) -> Task:
    """Submit a training job."""
    config = TaskConfig(
        name=f"training-{dataset_path}",
        instance_type=gpu_type,
        command=[
            "python", "train.py",
            "--dataset", dataset_path,
            "--epochs", str(epochs)
        ],
        volumes=[
            VolumeSpec(size_gb=100, mount_path="/data")
        ]
    )
    
    flow = Flow()
    return flow.run(config)

def stream_task_output(task: Task) -> Iterator[str]:
    """Stream task logs until completion."""
    for line in task.logs(follow=True):
        yield line
        if task.is_terminal:
            break
```

## API Documentation

### Mithril (Mithril) API Reference

Full documentation: https://docs.mithril.ai/compute-api/compute-api-reference
API Overview: https://docs.mithril.ai/compute-api/api-overview-and-quickstart
Startup Scripts: https://docs.mithril.ai/compute-and-storage/startup-scripts
Instance Types: https://docs.mithril.ai/compute-and-storage/instance-types-and-specifications
OpenAPI Specification: https://api.mithril.ai/openapi.json

### API Overview

The Mithril API provides programmatic access to Mithril's compute infrastructure:
- Base URL: `https://api.mithril.ai/v2/`
- Currently supports: Spot Instances, Storage Volumes, SSH Keys
- Requires API key authentication
- API keys created at https://app.mithril.ai/account/apikeys

### Prerequisites
- Billing must be configured in Mithril Console before API usage
- Project must exist for most operations

### Key API Endpoints

#### Authentication
All endpoints require Bearer authentication with Mithril API Key:
```python
headers = {"Authorization": f"Bearer {api_key}"}
```

#### Core Resource Endpoints

##### Projects
- `GET /v2/projects` - List all projects

##### Instance Types
- `GET /v2/instance-types` - Get available instance types and specifications

##### SSH Keys
- `GET /v2/ssh-keys` - List SSH keys
- `POST /v2/ssh-keys` - Create new SSH key
- `DELETE /v2/ssh-keys/{key_id}` - Delete SSH key

##### Storage Volumes
- `GET /v2/volumes` - List volumes
- `POST /v2/volumes` - Create volume (Block or File storage)
- `DELETE /v2/volumes/{volume_id}` - Delete volume

##### Spot Instances (Bids)
- `GET /v2/spot/availability` - Check current spot capacity and pricing
- `POST /v2/spot/bids` - Create bid for spot instances
- `GET /v2/spot/bids` - List bids (filter by project, instance type, region, status)
- `PATCH /v2/spot/bids/{bid_fid}` - Update bid (e.g., change limit price)
- `DELETE /v2/spot/bids/{bid_fid}` - Cancel bid (NOT POST to /cancel endpoint!)

##### Instances
- `GET /v2/instances` - List instances (filter by project, instance type, region, status)

##### API Keys
- Manage API keys for authentication

##### Profile
- User profile management

### Common API Mistakes to Avoid

1. **Cancelling Bids**: Use `DELETE /v2/spot/bids/{bid_fid}`, NOT `POST /v2/spot/bids/{bid_fid}/cancel`
   - The API uses RESTful DELETE for cancellation, not a separate /cancel endpoint
   - This applies to both v1 (`/bids/{id}`) and v2 (`/v2/spot/bids/{id}`) APIs

2. **Volume Defaults**: Default volume size should be minimal (1GB) for tests to avoid quota exhaustion
   - Production uses can specify larger sizes as needed
   - Colab persistence volumes were causing quota issues at 100GB default

3. **Region Availability**: Not all instance types are available in all regions
   - Preferably use us-central1-b for H100s (it's less congested)
   - A100s may not be available in all regions
   - Always check availability before hardcoding region/instance combinations

#### Request/Response Models (Based on Mithril API Types)

##### Instance Type Format
Mithril API instance types ALWAYS include memory specification in the format: `{gpu}-{memory}gb.{interconnect}.{count}x`

Verified examples from actual API response:
- `a100-80gb.sxm.1x` - 1x A100 80GB with SXM4 interconnect
- `a100-80gb.sxm.2x` - 2x A100 80GB with SXM4 interconnect
- `a100-80gb.sxm.4x` - 4x A100 80GB with SXM4 interconnect
- `a100-80gb.sxm.8x` - 8x A100 80GB with SXM4 interconnect
- `h100-80gb.sxm.8x` - 8x H100 80GB with SXM5 interconnect
- `h100-80gb.pcie.1x` - 1x H100 80GB with PCIe (different FID than SXM)

Note: ALL instance types include memory (e.g., "80gb") in their names.

##### IMPORTANT: Instance Type FIDs
The Mithril API uses opaque FIDs (e.g., "it_MsIRhxj3ccyVWGfP") in requests/responses.
The SDK automatically translates between user-friendly names and FIDs:
- User provides: "a100" or "2xa100" 
- SDK resolves to: "it_MsIRhxj3ccyVWGfP" (for a100-80gb.sxm.1x)
- API returns: "it_MsIRhxj3ccyVWGfP"
- SDK displays as: "a100-80gb.sxm.1x"

Current FID mappings (as of 2025-07-02):
- `it_MsIRhxj3ccyVWGfP` → a100-80gb.sxm.1x
- `it_5M6aGxGovNeX5ltT` → a100-80gb.sxm.2x
- `it_fK7Cx6TVhOK5ZfXT` → a100-80gb.sxm.4x
- `it_J7OyNf9idfImLIFo` → a100-80gb.sxm.8x
- `it_5ECSoHQjLBzrp5YM` → h100-80gb.sxm.8x
- `it_XqgKWbhZ5gznAYsG` → h100-80gb.pcie.1x

##### InstanceTypeModel Response (Actual Mithril API Response)
```json
{
  "fid": "it_5M6aGxGovNeX5ltT",
  "name": "a100-80gb.sxm.2x",
  "num_cpus": 48,
  "cpu_type": "Intel(R) Xeon(R) Platinum 8468",
  "ram_gb": 450,
  "num_gpus": 2,
  "gpu_type": "NVIDIA A100",
  "gpu_memory_gb": 80,
  "gpu_socket": "SXM4",
  "local_storage_gb": 3500
}
```

Note the actual field names from Mithril API:
- `ram_gb` (not `ram`)
- `num_gpus` and `gpu_type` (not nested `gpus` array)
- `gpu_memory_gb` (separate field)
- `gpu_socket` (e.g., "SXM4", "SXM5")
- `cpu_type` (includes full CPU model name)

##### ProjectModel Response
```json
{
  "fid": "proj-abc123def456",
  "name": "my-ml-project",
  "created_at": "2024-01-01T00:00:00Z"
}
```

##### CreateBidRequest
```json
{
  "project": "proj-abc123def456",           // Project FID (required)
  "region": "us-central1-a",                // Region (required)
  "instance_type": "it_MsIRhxj3ccyVWGfP",   // Instance type FID, NOT name! (required)
  "limit_price": "$15.00",                  // Dollar string format (required)
  "ssh_keys": ["ssh-key-abc123"],           // SSH key FIDs (required)
  "startup_script": "#!/bin/bash\n...",     // Cloud-init script (required)
  "volumes": ["vol-abc123"],                // Volume FIDs (optional)
  "name": "my-training-job",                // Display name (optional but recommended)
  "instance_quantity": 1                     // Number of instances (defaults to 1)
}
```

IMPORTANT bid creation notes:
- `instance_type` MUST be the FID (e.g., "it_MsIRhxj3ccyVWGfP"), not the name ("a100")
- Field is `limit_price` not `price`
- Price MUST include dollar sign (e.g., "$15.00")
- `startup_script` is cloud-init format and is required

##### BidModel Response (Actual Mithril API Response)
```json
{
  "fid": "bid_xyz789abc123",                 // Bid FID
  "name": "my-training-job",                 // Display name
  "project": "proj-abc123def456",            // Project FID
  "created_by": "user-123",                  // User FID
  "region": "us-central1-a",                 // Region
  "instance_type": "it_MsIRhxj3ccyVWGfP",    // Instance type FID (NOT name!)
  "instance_quantity": 1,                    // Number of instances
  "limit_price": "$15.00",                   // Dollar string format
  "status": "pending",                       // pending|provisioning|allocated|running|completed|failed|cancelled|terminated
  "ssh_keys": ["ssh-key-abc123"],            // SSH key FIDs
  "startup_script": "#!/bin/bash\n...",      // Cloud-init script
  "volumes": ["vol-abc123"],                 // Volume FIDs
  "created_at": "2024-01-15T10:30:00Z",      // ISO timestamp
  "updated_at": "2024-01-15T10:35:00Z"       // ISO timestamp
}
```

Status progression:
- `pending` → Bid created, waiting for auction
- `provisioning` → Instance being created
- `allocated` → Instance ready, startup script running
- `running` → Startup script completed (if trackable)
- `completed`/`terminated` → Instance stopped
- `failed` → Error occurred
- `cancelled` → User cancelled

##### AuctionModel (Spot Availability) - Actual Mithril API Response
```json
{
  "fid": "auc_rECU5s87CABp37aB",
  "instance_type": "it_XqgKWbhZ5gznAYsG",  // FID reference, not name
  "region": "us-central1-a",
  "capacity": 11,
  "last_instance_price": "$10.00"  // Dollar string format
}
```

Note: The spot availability response uses instance type FIDs (e.g., "it_XqgKWbhZ5gznAYsG") 
rather than human-readable names. To get the actual instance type details, you need to 
cross-reference with the instance types endpoint.

##### InstanceModel Response
```json
{
  "fid": "inst-abc123xyz789",
  "bid": "bid-xyz789abc123",
  "status": "running",  // pending|running|stopped|terminated
  "ssh_destination": "34.125.67.89:22",  // SSH connection string (host:port)
  "public_ip": "34.125.67.89",  // DEPRECATED - use ssh_destination
  "private_ip": "10.128.0.5",
  "created_at": "2024-01-15T10:35:00Z",
  "terminated_at": null
}
```

Note: The Mithril API now uses `ssh_destination` field instead of `public_ip`. The SDK 
automatically handles both fields for backwards compatibility, preferring `ssh_destination` 
when available and falling back to `public_ip` if needed.

##### VolumeModel Response
```json
{
  "fid": "vol-abc123def456",
  "name": "training-data",
  "project": "proj-abc123def456",
  "region": "us-central1-a",
  "capacity_gb": 500,
  "interface": "block",  // block|file
  "status": "available",  // available|attached|deleting
  "created_at": "2024-01-15T09:00:00Z",
  "updated_at": "2024-01-15T09:05:00Z"
}
```

##### SSHKeyModel Response
```json
{
  "fid": "ssh-key-abc123",
  "name": "my-dev-key",
  "project": "proj-abc123def456",
  "public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ...",
  "created_at": "2024-01-15T08:00:00Z"
}
```

##### Paginated Response Format (BidsResponse)
```json
{
  "data": [
    {
      "fid": "bid-xyz789abc123",
      "project": "proj-abc123def456",
      "region": "us-central1-a",
      "instance_type": "a100-80gb.sxm.2x",
      "price": "$15.00",
      "status": "running",
      "ssh_keys": ["ssh-key-abc123"],
      "startup_script": "#!/bin/bash\n...",
      "volumes": ["vol-abc123"],
      "created_at": "2024-01-15T10:30:00Z",
      "updated_at": "2024-01-15T10:35:00Z"
    }
  ],
  "next_cursor": "cursor-abc123"  // Optional, for pagination
}
```

##### List Response Formats
IMPORTANT: Response format varies by endpoint!

Endpoints that return arrays directly:
- `GET /v2/projects` → `ProjectModel[]`
- `GET /v2/ssh-keys` → `SSHKeyModel[]`
- `GET /v2/instance-types` → `InstanceTypeModel[]`
- `GET /v2/spot/availability` → `AuctionModel[]`
- `GET /v2/volumes` → `VolumeModel[]`

Endpoints that return paginated objects with "data" key:
- `GET /v2/spot/bids` → `{"data": BidModel[], "next_cursor": string?}`
- Note: Individual bid GET is NOT supported - must list and filter

#### Important Format Details and Common Gotchas

##### Price Format
- Always includes dollar sign: "$25.00" not "25.00"
- String format, not number
- Used in `limit_price` field for bids
- Returned as `last_instance_price` in spot availability

##### Common API Gotchas
1. **Instance Type FIDs**: API uses FIDs everywhere, not human names
   - Wrong: `{"instance_type": "a100"}`
   - Right: `{"instance_type": "it_MsIRhxj3ccyVWGfP"}`

2. **Bid Listing**: No individual GET endpoint
   - Wrong: `GET /v2/spot/bids/{bid_id}`
   - Right: `GET /v2/spot/bids?project={project_id}` then filter

3. **Response Formats**: Inconsistent between endpoints
   - Arrays: projects, ssh-keys, instance-types, spot/availability
   - Objects with "data": spot/bids

4. **Field Names**: Not always intuitive
   - `limit_price` not `price` in bid creation
   - `instance_quantity` not `num_instances`
   - `ram_gb` not `memory_gb` in instance types

5. **Status Values**: Many possible states
   - Bids: pending, provisioning, allocated, running, completed, failed, cancelled, terminated, deactivated, terminating
   - Map appropriately to your domain model

6. **Common Field Name Mistakes in TaskConfig**:
   - `env` NOT `environment` for environment variables
   - `command` as list NOT string (though string is auto-converted)
   - `ssh_keys` NOT `ssh_key` (always plural)
   - `max_price_per_hour` NOT `max_price` or `price`
   - `num_instances` NOT `instance_count` or `instances`

##### Price Format
All prices use dollar string format: `"$12.50"`, `"$100.00"`
- Always includes dollar sign
- Always includes two decimal places
- Stored as string, not number

##### FID Format (Verified from API)
All resource IDs use FID (Mithril ID) format:
- Instance Types: `it_` prefix (e.g., `it_5M6aGxGovNeX5ltT`)
- Auctions: `auc_` prefix (e.g., `auc_rECU5s87CABp37aB`)
- Projects: Expected format `proj_{random}` 
- Bids: Expected format `bid_{random}`
- Volumes: Expected format `vol_{random}`
- SSH Keys: Expected format `ssh-key_{random}`

##### Instance Type String Format (Verified)
Pattern: `{gpu_type}-{memory}gb.{interconnect}.{count}x`
- GPU type: `a100`, `h100`, etc.
- Memory: ALWAYS specified (e.g., `80gb`)
- Interconnect: `sxm` (shown without version in name, but API returns socket type like "SXM4", "SXM5")
- Count: `1x`, `2x`, `4x`, `8x`

Examples from actual API:
- `a100-80gb.sxm.1x` (uses SXM4 socket)
- `a100-80gb.sxm.8x` (uses SXM4 socket)
- `h100-80gb.sxm.8x` (uses SXM5 socket)

#### Status Codes
- 201 Created - Resource successfully created
- 200 OK - Successful retrieval
- 400 Bad Request - Invalid parameters
- 401 Unauthorized - Invalid API key
- 422 Unprocessable Entity - Validation error
- 404 Not Found - Resource not found

#### Error Response Format
```json
{
  "error": {
    "code": "VALIDATION_ERROR",
    "message": "Invalid instance type",
    "details": {
      "field": "instance_type",
      "value": "invalid-gpu",
      "allowed_values": ["a100-80gb.sxm.2x", "h100.sxm.8x"]
    }
  },
  "request_id": "req-abc123def456"
}
```

### Best Practices

1. **Authentication Security**
   - Store API keys securely (environment variables, secret managers)
   - Never commit API keys to version control

2. **Error Handling**
   - Implement retry logic for transient failures
   - Handle 422 validation errors with detailed field-level messages
   - Monitor rate limits

3. **Resource Management**
   - Always check `/v2/spot/availability` before creating bids
   - Verify project exists before resource creation
   - Clean up unused volumes and instances

4. **Startup Scripts**
   - Use cloud-init format for instance initialization
   - Include error handling in startup scripts
   - Log startup progress for debugging

### Common Workflows

#### 1. Create Spot Instance
```python
# Check availability
response = requests.get(f"{base_url}/v2/spot/availability", headers=headers)
availability = response.json()

# Create bid
bid_data = {
    "project": "my-project",
    "instance_type": "gpu.nvidia.a100",
    "region": "us-central1-a",
    "limit_price": 10.0,
    "quantity": 1,
    "startup_script": "#!/bin/bash\n..."
}
response = requests.post(f"{base_url}/v2/spot/bids", json=bid_data, headers=headers)
```

#### 2. Manage Storage
```python
# Create volume
volume_data = {
    "name": "training-data",
    "size_gb": 100,
    "interface": "block",  # or "file"
    "project": "my-project",
    "region": "us-central1-a"
}
response = requests.post(f"{base_url}/v2/volumes", json=volume_data, headers=headers)
```

## Codebase Structure

```
src/
└── flow/
    ├── api/                    # User-facing API
    │   ├── client.py          # Main Flow class and high-level API
    │   ├── models.py          # Pydantic models (TaskConfig, Task, etc.)
    │   ├── invoke.py          # Zero-import remote execution
    │   └── decorators.py      # Decorator interface (@app, @function)
    │
    ├── core/                   # Core domain logic
    │   ├── task_engine.py     # Task lifecycle management
    │   ├── engine.py          # Core workflow engine
    │   ├── code_packager.py   # Code packaging for remote execution
    │   ├── interfaces.py      # Protocol definitions (IProvider, etc.)
    │   └── resources/         # Resource parsing and matching
    │
    ├── providers/              # Cloud provider implementations
    │   ├── base.py            # Base provider interface
    │   ├── factory.py         # Provider factory
    │   ├── registry.py        # Provider registration
    │   ├── mithril/               # Mithril (Mithril) provider
    │   │   ├── provider.py    # Main Mithril provider
    │   │   ├── init.py        # Provider initialization
    │   │   ├── adapters/      # Model and mount adapters
    │   │   ├── api/           # API client and types
    │   │   ├── bidding/       # Auction and bid management
    │   │   ├── core/          # Core Mithril logic and constants
    │   │   ├── resources/     # Resource management (GPU, SSH, etc.)
    │   │   └── runtime/       # Runtime components and startup scripts
    │   └── local/             # Local provider for testing
    │
    ├── _internal/             # Internal components
    │   ├── frontends/         # Input format adapters
    │   ├── data/              # Data loading and resolution
    │   ├── storage/           # Volume and persistent storage
    │   ├── integrations/      # External service integrations
    │   ├── managers/          # Task and resource management
    │   ├── init/              # Initialization and config writing
    │   ├── io/                # HTTP and networking utilities
    │   ├── config_loader.py   # Multi-source configuration loading
    │   └── auth.py            # Authentication utilities
    │
    ├── cli/                   # CLI application
    │   ├── app.py             # Main CLI app
    │   ├── commands/          # CLI commands
    │   ├── config_wizard.py   # Interactive configuration setup
    │   ├── formatters/        # Output formatting
    │   └── migrate.py         # Migration utilities
    │
    ├── utils/                 # General utilities
    │   ├── retry.py           # Retry logic
    │   ├── validation.py      # Input validation
    │   ├── security.py        # Security utilities
    │   ├── instance_parser.py # Instance type parsing
    │   └── instance_validator.py # Instance validation
    │
    └── errors.py              # Exception hierarchy
```

## System Architecture

```
┌─────────────────────────────────────────────────────────────────────┐
│                           User Interface Layer                      │
├─────────────────────────┬───────────────┬───────────────────────────┤
│   Python API (flow.py)  │  CLI (cli/)   │  Decorators (@flow.run)   │
├─────────────────────────┴───────────────┴───────────────────────────┤
│                         Frontend Adapters                           │
├──────────┬──────────┬──────────┬──────────┬─────────────────────────┤
│   YAML   │  SLURM   │ Submitit │   CLI    │  Direct TaskConfig      │
├──────────┴──────────┴──────────┴──────────┴─────────────────────────┤
│                    Unified Task Model (models.py)                   │
│                        TaskConfig → Task                            │
├─────────────────────────────────────────────────────────────────────┤
│                      Core Domain Layer                              │
├─────────────────────┬──────────────────┬────────────────────────────┤
│  Instance Resolution│  Task Management │    Data Loading            │
│  - Parser           │  - Lifecycle     │    - S3 Resolution         │
│  - Catalog          │  - Status        │    - URL Handling          │
│  - Validation       │  - Monitoring    │    - Mount Specs           │
├─────────────────────┴──────────────────┴────────────────────────────┤
│                    Provider Abstraction Layer                       │
│                         IProvider Protocol                          │
├─────────────────────────────────────────────────────────────────────┤
│                      Provider Implementations                       │
├─────────────────────┬──────────────────┬────────────────────────────┤
│    Mithril Provider     │  Local Provider  │   Future: AWS/GCP/Azure    │
│  - Spot Bidding     │  - Testing       │                            │
│  - SSH Management   │  - Development   │                            │
│  - Volume Storage   │                  │                            │
└─────────────────────┴──────────────────┴────────────────────────────┘
```

## Data Flow

```
1. User Input
   ↓
2. Frontend Adapter (parse/validate)
   ↓
3. TaskConfig (unified model)
   ↓
4. Provider Selection (based on API endpoint)
   ↓
5. Instance Resolution
   - Parse instance type
   - Query catalog/API
   - Find best match
   ↓
6. Resource Creation
   - Create volumes
   - Generate SSH keys
   - Build startup script
   ↓
7. Task Submission
   - Create spot bid
   - Monitor status
   - Handle lifecycle
   ↓
8. Task Object (returned to user)
   - Status tracking
   - Log streaming
   - SSH access
```

## Instance Type System Updates

### Instance Type Mapping Configuration
The SDK supports flexible instance type mapping through multiple sources:

1. **Built-in defaults** (when Mithril_USE_BUILTIN_INSTANCE_MAPPINGS=true)
2. **Configuration file** (~/.flow/instance_types.json)
3. **Environment variable** (Mithril_INSTANCE_MAPPINGS)

Example configuration file:
```json
{
  "a100": "it_MsIRhxj3ccyVWGfP",
  "2xa100": "it_5M6aGxGovNeX5ltT",
  "custom-gpu": "it_CustomFID123"
}
```

## Caching and Performance

The SDK implements several caching strategies:

### User Cache
- 5-minute TTL for user information
- Reduces API calls for user data

### Log Cache
- 5-second TTL with size limits
- Prevents duplicate log entries
- Automatic cleanup on overflow

### SSH Connection Retry
- Exponential backoff for SSH connections
- Configurable retry attempts
- Automatic fallback strategies

## Future Directions

- Additional providers (AWS, GCP, Azure)
- Kubernetes backend
- Advanced scheduling (gang scheduling, priorities)
- Cost optimization algorithms
- Workflow orchestration (DAGs)
- Enhanced instance catalog system
- OpenTelemetry observability integration