Metadata-Version: 2.4
Name: quartermaster-engine
Version: 0.1.5
Summary: Execution engine for AI agent graphs — traversal, branching, merging, memory, message passing, and error handling
Project-URL: Homepage, https://github.com/MindMadeLab/quartermaster-sdk-py
Project-URL: Repository, https://github.com/MindMadeLab/quartermaster-sdk-py
Project-URL: Issues, https://github.com/MindMadeLab/quartermaster-sdk-py/issues
Author-email: MindMade <info@mindmade.io>
License-Expression: Apache-2.0
License-File: LICENSE
Keywords: ai-agents,execution-engine,flow-runner,graph,orchestration
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Requires-Python: >=3.11
Requires-Dist: quartermaster-graph>=0.1.5
Requires-Dist: quartermaster-nodes>=0.1.5
Requires-Dist: quartermaster-providers>=0.1.5
Provides-Extra: all
Requires-Dist: aiosqlite>=0.19; extra == 'all'
Requires-Dist: redis>=5.0; extra == 'all'
Provides-Extra: dev
Requires-Dist: mypy>=1.5; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest-cov>=4.1; extra == 'dev'
Requires-Dist: pytest>=7.4; extra == 'dev'
Requires-Dist: ruff>=0.1; extra == 'dev'
Provides-Extra: redis
Requires-Dist: redis>=5.0; extra == 'redis'
Provides-Extra: sqlite
Requires-Dist: aiosqlite>=0.19; extra == 'sqlite'
Description-Content-Type: text/markdown

# quartermaster-engine

Execution engine for AI agent graphs with pluggable storage, dispatching, and memory.

[![PyPI version](https://img.shields.io/pypi/v/quartermaster-engine)](https://pypi.org/project/quartermaster-engine/)
[![Python 3.11+](https://img.shields.io/badge/python-3.11%2B-blue)](https://www.python.org/)
[![License: Apache 2.0](https://img.shields.io/badge/license-Apache%202.0-green)](../LICENSE)

## Features

- **FlowRunner** orchestrates graph execution: traversal, branching, merging, and error handling
- **Pluggable dispatchers**: SyncDispatcher, ThreadDispatcher, AsyncDispatcher
- **Pluggable storage**: InMemoryStore, SQLiteStore, or implement your own ExecutionStore
- **Two memory layers**: FlowMemory (per-execution) and PersistentMemory (cross-execution)
- **Real-time event streaming**: NodeStarted, TokenGenerated, NodeFinished, FlowError, UserInputRequired
- **Per-node error strategies**: Stop, Retry (with configurable max retries), Skip
- **Flow pause/resume** for user interaction nodes
- **Sync and async execution** modes with `run()` and `run_async()`

## Installation

```bash
pip install quartermaster-engine

# With SQLite persistent store
pip install quartermaster-engine[sqlite]
```

## Quick Start

### Run a Simple Graph

```python
from uuid import uuid4
from quartermaster_engine import FlowRunner, InMemoryStore
from quartermaster_engine.nodes import SimpleNodeRegistry, NodeResult
from quartermaster_graph import GraphSpec, GraphNode, GraphEdge, NodeType

# 1. Define the graph
start_id, process_id, end_id = uuid4(), uuid4(), uuid4()

graph = GraphSpec(
    id=uuid4(),
    agent_id=uuid4(),
    start_node_id=start_id,
    nodes=[
        GraphNode(id=start_id, type=NodeType.START, name="Start"),
        GraphNode(
            id=process_id,
            type=NodeType.INSTRUCTION,
            name="Process",
            metadata={"llm_system_instruction": "Summarize the input.", "llm_model": "gpt-4o"},
        ),
        GraphNode(id=end_id, type=NodeType.END, name="End"),
    ],
    edges=[
        GraphEdge(source_id=start_id, target_id=process_id),
        GraphEdge(source_id=process_id, target_id=end_id),
    ],
)

# 2. Register node executors
registry = SimpleNodeRegistry()
# registry.register("Instruction1", my_instruction_executor)

# 3. Run the flow
runner = FlowRunner(graph=graph, node_registry=registry)
result = runner.run("Please summarize this article about AI safety.")

print(result.success)          # True/False
print(result.final_output)     # The final text output
print(result.duration_seconds)
```

### Using GraphBuilder from quartermaster-graph

```python
from quartermaster_graph import GraphBuilder
from quartermaster_engine import FlowRunner
from quartermaster_engine.nodes import SimpleNodeRegistry

graph = (
    GraphBuilder("Support Agent")
    .start()
    .instruction("Classify", model="gpt-4o")
    .decision("Route", options=["billing", "technical"])
    .on("billing").instruction("Handle billing").end()
    .on("technical").instruction("Handle technical").end()
    .build()
)

registry = SimpleNodeRegistry()
runner = FlowRunner(graph=graph, node_registry=registry)
result = runner.run("I need help with my invoice")
```

### Stream Events in Real Time

```python
from quartermaster_engine import FlowRunner, FlowEvent, NodeStarted, NodeFinished, TokenGenerated, FlowError

def handle_event(event: FlowEvent):
    if isinstance(event, NodeStarted):
        print(f"[START] {event.node_name} ({event.node_type})")
    elif isinstance(event, TokenGenerated):
        print(event.token, end="", flush=True)
    elif isinstance(event, NodeFinished):
        print(f"\n[DONE] Node finished: {event.result[:50]}...")
    elif isinstance(event, FlowError):
        print(f"[ERROR] {event.error} (recoverable={event.recoverable})")

runner = FlowRunner(graph=graph, node_registry=registry, on_event=handle_event)
result = runner.run("Hello!")
```

### Async Execution

```python
import asyncio
from quartermaster_engine import FlowRunner, TokenGenerated

async def run_flow():
    runner = FlowRunner(graph=graph, node_registry=registry)
    async for event in runner.run_async("Hello!"):
        if isinstance(event, TokenGenerated):
            print(event.token, end="", flush=True)

asyncio.run(run_flow())
```

### Quick Execution with `run_graph()`

For rapid prototyping, use the convenience function:

```python
from quartermaster_engine import run_graph

# Non-interactive (provide input)
run_graph(agent, user_input="Explain quantum computing")

# Interactive (prompts stdin at User nodes)
run_graph(agent)

# Force provider
run_graph(agent, user_input="Hello", provider="openai")
```

`run_graph()` handles provider detection, node registration, streaming output,
and the pause/resume cycle for interactive User nodes.

## API Reference

### FlowRunner

The core orchestration class. Accepts a `GraphSpec` from `quartermaster-graph` (`AgentGraph` still works as a deprecated alias).

```python
from quartermaster_engine import FlowRunner
from quartermaster_engine.dispatchers.sync_dispatcher import SyncDispatcher
from quartermaster_engine.messaging.context_manager import ContextManager

runner = FlowRunner(
    graph=spec,                      # GraphSpec from quartermaster-graph
    node_registry=registry,          # Maps node types to executors
    store=InMemoryStore(),           # Execution state storage
    dispatcher=SyncDispatcher(),     # How branches are dispatched
    context_manager=ContextManager(),  # LLM context window management
    on_event=handle_event,           # Real-time event callback
)
```

| Method | Description |
|--------|-------------|
| `run(input_message, flow_id=None) -> FlowResult` | Execute synchronously |
| `run_async(input_message, flow_id=None) -> AsyncIterator[FlowEvent]` | Execute asynchronously, yielding events |
| `resume(flow_id, user_input) -> FlowResult` | Resume a paused flow with user input |
| `stop(flow_id)` | Stop a running flow |

### FlowResult

| Field | Type | Description |
|-------|------|-------------|
| `flow_id` | `UUID` | Unique execution identifier |
| `success` | `bool` | Whether all nodes completed successfully |
| `final_output` | `str` | Text output from the End node |
| `output_data` | `dict` | Structured output data |
| `error` | `str \| None` | Error message if failed |
| `node_results` | `dict[UUID, NodeResult]` | Per-node results |
| `duration_seconds` | `float` | Total execution time |

### Dispatchers

Control how successor nodes are executed.

| Dispatcher | Description |
|------------|-------------|
| `SyncDispatcher` | Execute nodes sequentially in the calling thread. Simple and predictable. |
| `ThreadDispatcher(max_workers=4)` | Execute branches in parallel using a thread pool. Good for I/O-bound nodes. |
| `AsyncDispatcher` | Execute branches concurrently using asyncio tasks. For async web applications. |

All dispatchers implement the `TaskDispatcher` protocol:

```python
class TaskDispatcher(Protocol):
    def dispatch(self, flow_id, node_id, execute_fn) -> None: ...
    def wait_all(self) -> None: ...
    def shutdown(self) -> None: ...
```

### Execution Stores

Pluggable storage for flow state, memory, and messages.

| Store | Description |
|-------|-------------|
| `InMemoryStore` | Dict-backed, no persistence. Great for testing. |
| `SQLiteStore(db_path)` | SQLite-backed with WAL mode. For local development. |

Implement `ExecutionStore` for custom backends (Redis, PostgreSQL, etc.):

```python
from quartermaster_engine import ExecutionStore

class RedisStore:
    def save_node_execution(self, flow_id, node_id, execution) -> None: ...
    def get_node_execution(self, flow_id, node_id) -> NodeExecution | None: ...
    def get_all_node_executions(self, flow_id) -> dict[UUID, NodeExecution]: ...
    def save_memory(self, flow_id, key, value) -> None: ...
    def get_memory(self, flow_id, key) -> Any: ...
    def get_all_memory(self, flow_id) -> dict[str, Any]: ...
    def delete_memory(self, flow_id, key) -> None: ...
    def save_messages(self, flow_id, node_id, messages) -> None: ...
    def get_messages(self, flow_id, node_id) -> list[Message]: ...
    def append_message(self, flow_id, node_id, message) -> None: ...
    def clear_flow(self, flow_id) -> None: ...
```

### Memory System

**FlowMemory** -- scoped to a single flow execution:

```python
from quartermaster_engine import FlowMemory, InMemoryStore

store = InMemoryStore()
memory = FlowMemory(flow_id=my_flow_id, store=store)

memory.set("user_name", "Alice")
memory.set("preferences", {"language": "en"})

name = memory.get("user_name")            # "Alice"
all_data = memory.get_all()                # {"user_name": "Alice", ...}
keys = memory.list_keys()                  # ["user_name", "preferences"]
memory.delete("preferences")
memory.clear()
```

**PersistentMemory** -- cross-flow memory that survives between executions:

```python
from quartermaster_engine import PersistentMemory, InMemoryPersistence

persistence = InMemoryPersistence()

persistence.write(agent_id, "user_pref", "dark_mode")
value = persistence.read(agent_id, "user_pref")     # "dark_mode"
persistence.update(agent_id, "user_pref", "light_mode")

results = persistence.search(agent_id, "pref")      # Substring search
keys = persistence.list_keys(agent_id)               # ["user_pref"]
persistence.delete(agent_id, "user_pref")
```

### Error Handling

Per-node error strategies configured via `GraphNode.error_handling`:

| Strategy | Behavior |
|----------|----------|
| `ErrorStrategy.STOP` | Halt entire flow on error (default) |
| `ErrorStrategy.RETRY` | Retry up to `max_retries` times with `retry_delay` backoff |
| `ErrorStrategy.SKIP` | Skip failed node, continue to successors |

```python
from quartermaster_engine.types import GraphNode, NodeType, ErrorStrategy

node = GraphNode(
    type=NodeType.INSTRUCTION,
    name="Unreliable API",
    error_handling=ErrorStrategy.RETRY,
    max_retries=3,
    retry_delay=2.0,
    timeout=30.0,
)
```

### Events

Real-time events emitted during flow execution:

| Event | Fields | Description |
|-------|--------|-------------|
| `NodeStarted` | `flow_id`, `node_id`, `node_type`, `node_name` | Node begins execution |
| `TokenGenerated` | `flow_id`, `node_id`, `token` | Streaming token from LLM |
| `NodeFinished` | `flow_id`, `node_id`, `result`, `output_data` | Node completed |
| `FlowFinished` | `flow_id`, `final_output`, `output_data` | Entire flow completed |
| `UserInputRequired` | `flow_id`, `node_id`, `prompt`, `options` | Flow paused for user input |
| `FlowError` | `flow_id`, `node_id`, `error`, `recoverable` | Node failed |

`run_graph()` uses streaming by default -- `TokenGenerated` events are printed
as they arrive, giving real-time output from LLM nodes without extra setup.

### ExecutionContext

The runtime context passed to each node executor:

| Field | Type | Description |
|-------|------|-------------|
| `flow_id` | `UUID` | Flow execution identifier |
| `node_id` | `UUID` | Current node identifier |
| `graph` | `GraphSpec` | Full graph definition |
| `current_node` | `GraphNode` | Current node definition |
| `messages` | `list[Message]` | Conversation history |
| `memory` | `dict[str, Any]` | Flow-scoped memory snapshot |
| `metadata` | `dict[str, Any]` | Node metadata |
| `on_token` | `Callable[[str], None] \| None` | Callback for streaming tokens |

Helper methods:

| Method | Description |
|--------|-------------|
| `get_meta(key, default=None)` | Get value from node metadata, falling back to context metadata |
| `set_meta(key, value)` | Set a metadata value on this context |
| `emit_token(token)` | Emit a streaming token via callback |
| `emit_message(content)` | Emit a complete message via callback |

### Node Execution Protocol

Implement `NodeExecutor` to add custom node types:

```python
from quartermaster_engine.nodes import NodeExecutor, NodeResult
from quartermaster_engine.context.execution_context import ExecutionContext

class MyInstructionExecutor:
    async def execute(self, context: ExecutionContext) -> NodeResult:
        system_instruction = context.get_meta("llm_system_instruction", "")
        # Call your LLM here...
        response_text = "Generated response"

        # Stream tokens in real time
        for token in response_text.split():
            context.emit_token(token + " ")

        return NodeResult(
            success=True,
            data={"model": "gpt-4o"},
            output_text=response_text,
        )
```

Register executors with `SimpleNodeRegistry`:

```python
from quartermaster_engine.nodes import SimpleNodeRegistry

registry = SimpleNodeRegistry()
registry.register("Instruction1", MyInstructionExecutor())
registry.register("Decision1", MyDecisionExecutor())

# List registered types
registry.list_types()  # ["Instruction1", "Decision1"]
```

### NodeResult

Returned by node executors after execution:

| Field | Type | Description |
|-------|------|-------------|
| `success` | `bool` | Whether execution succeeded |
| `data` | `dict[str, Any]` | Structured output data |
| `error` | `str \| None` | Error message if failed |
| `picked_node` | `str \| None` | For decision nodes: which successor to trigger |
| `output_text` | `str \| None` | Main text output |
| `wait_for_user` | `bool` | If True, flow pauses for user input |
| `user_prompt` | `str \| None` | Prompt to show the user |
| `user_options` | `list[str] \| None` | Options for user selection |

## Configuration

### SQLiteStore

```python
from quartermaster_engine.stores.sqlite_store import SQLiteStore

store = SQLiteStore(db_path="my_agent.db")
runner = FlowRunner(graph=graph, node_registry=registry, store=store)
```

Tables are created automatically on first use. Uses WAL mode for concurrent read access.

### ThreadDispatcher

```python
from quartermaster_engine.dispatchers.thread_dispatcher import ThreadDispatcher

dispatcher = ThreadDispatcher(max_workers=8)
runner = FlowRunner(graph=graph, node_registry=registry, dispatcher=dispatcher)
result = runner.run("Process this in parallel")
dispatcher.shutdown()  # Clean up thread pool
```

## Contributing

See [CONTRIBUTING.md](../CONTRIBUTING.md) for guidelines.

## License

Apache License 2.0 -- see [LICENSE](../LICENSE) for details.
