# GraphReFly — graphrefly (Python)

Describe automations in plain language. Review them visually. Run them persistently. Trace every decision back to its source.

GraphReFly is a reactive graph engine for human + LLM co-operation. An LLM composes a reactive graph from a natural-language description (like SQL for data flows). The graph runs persistently, checkpoints state, and traces every decision through a causal chain — enabling explainability ("why was this flagged?"), auditability, and progressive trust accumulation. This package is the Python implementation (`graphrefly`). Zero dependencies.

## Authoritative behavior

- Shared normative spec: the `GRAPHREFLY-SPEC.md` file in the **graphrefly** spec repository (sibling to this package in the GraphReFly workspace). Implementations must match that document, not legacy callbag-only behavior.

## Where to read in this repo

- `docs/roadmap.md` — phased implementation status
- `docs/docs-guidance.md` — how API docs and examples are maintained
- `docs/test-guidance.md` — how tests are organized
- `docs/optimizations.md` — performance optimizations
- `docs/ADAPTER-CONTRACT.md` — checkpoint adapter contract
- `examples/` — small runnable sketches (verify against current exports before copy-paste)

## Imports

- Main entry: `from graphrefly import ...` (re-exports core, extra, graph, patterns)
- Submodules: `from graphrefly.core import ...`, `from graphrefly.extra import ...`, `from graphrefly.graph import ...`
- Patterns: `from graphrefly.patterns import ...`
- Compat: `from graphrefly.compat import ...`
- Integrations: `from graphrefly.integrations import ...`

## Public API

### core — node primitive + sugar constructors (`src/graphrefly/core/`)

- `node(options)` — low-level reactive node factory; accepts `initial`, `fn`, `guard`, `meta`, `name`, `deps`
- `state(initial)` — sugar: a writable state node
- `producer(fn)` — sugar: a push-source node driven by its `fn`
- `derived(deps, fn)` — sugar: a computed node that re-runs `fn` when deps change
- `effect(deps, fn)` — sugar: a side-effect node (no value consumers)
- `pipe(source, *operators)` — sugar: chains operator functions onto a node
- `dynamic_node(tracking_fn)` — node that tracks its own reactive dependencies at runtime

### core — batch (`src/graphrefly/core/`)

- `batch(fn)` — runs `fn` in a batch scope; defers DATA/RESOLVED until outermost batch returns
- `is_batching()` — returns `True` while inside `batch()` or while the drain loop is running
- `partition_for_batch(messages)` — splits a message list into immediate / deferred / terminal groups
- `down_with_batch(sink, messages)` — delivers messages downstream respecting batch semantics and tier ordering

### core — guard (`src/graphrefly/core/`)

- `policy(build)` — declarative guard builder; registers `allow()`/`deny()` rules, deny wins on conflict
- `policy_from_rules(rules)` — builds a guard from a list of rule dicts
- `compose_guards(*guards)` — composes multiple guard functions into one
- `GuardDenied` — error class raised when a guard denies an action
- `access_hint_for_guard(guard)` — probes a guard with standard actor types, returns a human-readable access hint string

### core — meta (`src/graphrefly/core/`)

- `meta_snapshot(node)` — reads current cached values of all meta companion nodes; returns plain `{ key: value }` dict
- `describe_node(node)` — builds a describe-node output slice (type, status, deps, meta, value) for one node

### core — messages (`src/graphrefly/core/`)

- `MessageType` — enum with message type constants: `DATA`, `DIRTY`, `RESOLVED`, `COMPLETE`, `ERROR`, `TEARDOWN`, `INVALIDATE`, `PAUSE`, `RESUME`
- `dispatch_messages(messages)` — delivers a list of messages
- `is_phase2_message(msg)` — returns `True` for messages at tier 2+

### core — actor (`src/graphrefly/core/`)

- `Actor` — `{ type, id }` identity used by guards
- `system_actor` — the built-in system actor constant
- `normalize_actor(actor)` — coerces various actor representations to `Actor`

### core — clock (`src/graphrefly/core/`)

- `monotonic_ns()` — monotonic nanosecond timestamp
- `wall_clock_ns()` — wall-clock nanosecond timestamp

### core — versioning (`src/graphrefly/core/`)

- `create_versioning(level)` — creates a versioning context at the given level
- `advance_version(versioning)` — advances the version counter
- `default_hash(value)` — default hash function for value-based versioning
- `is_v1(node)` — returns `True` if a node uses V1 versioning

### core — runner (`src/graphrefly/core/`)

- `Runner` — protocol for async scheduling backends
- `get_default_runner()` — returns the current default runner
- `set_default_runner(runner)` — sets the default runner
- `resolve_runner(runner)` — resolves a runner argument to a concrete runner

### graph — Graph container (`src/graphrefly/graph/`)

- `Graph` — reactive node registry; methods: `register`, `resolve`, `describe`, `observe`, `snapshot`, `diff`, `diagram`, `spy`, `mount`, `unmount`, `teardown`
- `reachable(graph, start_path, options=None)` — returns the set of node paths reachable from `start_path` via dependency edges

### extra — tier 1 operators (`src/graphrefly/extra/tier1.py`)

- `map(fn)`, `filter(pred)`, `scan(fn, seed)`, `reduce(fn, seed)` — transform
- `take(n)`, `skip(n)`, `take_while(pred)`, `take_until(signal)` — limiting
- `first()`, `last(default=...)`, `find(pred)`, `element_at(i)` — selection
- `start_with(value)`, `tap(fn)`, `distinct_until_changed(eq=None)` — utility
- `pairwise()` — emits `[prev, curr]` pairs
- `combine(*sources)`, `with_latest_from(other)` — combination
- `merge(*sources)`, `zip(*sources)`, `concat(second)`, `race(*sources)` — merging

### extra — tier 2 operators (`src/graphrefly/extra/tier2.py`)

- `switch_map(fn)`, `concat_map(fn)`, `flat_map(fn)`, `exhaust_map(fn)` — higher-order
- `debounce(seconds)`, `throttle(seconds)`, `delay(seconds)` — timing
- `sample(notifier)`, `audit(seconds)`, `timeout(seconds)` — sampling / timeout
- `buffer(notifier)`, `buffer_count(n)`, `buffer_time(seconds)` — buffering
- `window(notifier)`, `window_count(n)`, `window_time(seconds)` — windowing
- `interval(seconds)` — emits incrementing integers on a timer
- `repeat(times)` — resubscribes on COMPLETE
- `gate(control)` — passes values only while control node is truthy
- `pausable()` — gates emission on a pause signal
- `rescue(recover)` — error recovery

### extra — sources (`src/graphrefly/extra/sources.py`)

- `of(*values)` — emits given values then completes
- `empty()` — completes immediately
- `never()` — never emits or completes
- `throw_error(err)` — emits an error immediately
- `from_iter(iterable)` — synchronously emits all values from an iterable
- `from_timer(seconds)` — emits on a recurring timer
- `from_cron(expr)` — emits on a cron schedule
- `from_awaitable(awaitable)` — wraps an awaitable as a node
- `from_async_iter(aiter)` — emits values from an async iterable
- `from_any(value)` — normalises any source (awaitable, iterable, node, etc.) into a node
- `from_http(url, **opts)` — wraps an HTTP request as a node
- `from_event_emitter(emitter, event)` — wraps an event emitter
- `from_fs_watch(path, **opts)` — watches filesystem for changes
- `from_webhook(path, **opts)` — creates a webhook-backed source node
- `from_websocket(url, **opts)` — wraps a WebSocket connection as a source node
- `from_mcp(transport, **opts)` — wraps an MCP (Model Context Protocol) connection
- `from_git_hook(hook, **opts)` — wraps a git hook as a source node
- `share(source)` — multicast operator; shares a single subscription
- `cached(source)` — memoised producer; shares a single subscription
- `replay(source, buffer_size=1)` — replays last `n` values to new subscribers
- `for_each(source, fn)` — subscribes and calls `fn` for each DATA value
- `to_list(source)` — collects all values into a list
- `to_array(source)` — alias: collects all values into a list node
- `first_value_from(source)` — returns the first DATA value (async)
- `to_sse(source, **opts)` — converts a node to Server-Sent Events frames
- `to_websocket(source, url, **opts)` — pipes node values to a WebSocket

### extra — data structures (`src/graphrefly/extra/data_structures.py`)

- `reactive_map(initial=None)` — reactive key-value map node
- `reactive_log(options=None)` — append-only reactive log node
- `log_slice(log, start, end=None)` — derived slice of a reactive log
- `reactive_index(name=None)` — reactive inverted index over a reactive map
- `reactive_list(initial=None)` — reactive ordered list node
- `pubsub()` — topic-based publish/subscribe primitive

### extra — resilience (`src/graphrefly/extra/resilience.py`)

- `retry(source, options)` — retries a failing source with configurable backoff
- `circuit_breaker(options)` — circuit-breaker operator; opens after threshold failures
- `token_bucket(capacity, refill_per_second)` — token-bucket rate-limiter node
- `token_tracker(capacity, refill_per_second)` — tracks token usage against a budget
- `rate_limiter(source, max_events, window_ns)` — rate-limiter operator
- `with_breaker(breaker)` — operator that feeds into a circuit breaker
- `with_status(status_node)` — operator that writes breaker/limiter status to a node

### extra — backoff (`src/graphrefly/extra/backoff.py`)

- `constant(delay_ns)` — strategy that always returns the same delay
- `linear(base_ns, step_ns=None)` — linearly increasing delay
- `exponential(options=None)` — exponentially increasing delay with optional jitter and cap
- `fibonacci(base_ns=..., max_delay_ns=...)` — Fibonacci-scaled delays
- `decorrelated_jitter(base_ns=..., max_ns=...)` — AWS decorrelated-jitter strategy
- `with_max_attempts(strategy, max_attempts)` — caps any strategy at a maximum attempt count
- `resolve_backoff_preset(name)` — maps a preset name string to a concrete `BackoffStrategy`

### extra — cron (`src/graphrefly/extra/cron.py`)

- `parse_cron(expr)` — parses a 5-field cron expression into a `CronSchedule`
- `matches_cron(schedule, dt)` — returns `True` if `dt` matches the schedule

### extra — checkpoint (`src/graphrefly/extra/checkpoint.py`)

- `MemoryCheckpointAdapter` — in-memory checkpoint storage
- `DictCheckpointAdapter` — plain-dict checkpoint storage
- `FileCheckpointAdapter` — file-system checkpoint storage
- `SqliteCheckpointAdapter` — SQLite checkpoint storage
- `save_graph_checkpoint(graph, adapter)` — serialises graph state to an adapter
- `restore_graph_checkpoint(graph, adapter)` — restores graph state from an adapter
- `checkpoint_node_value(node)` — checkpoints a single node value

### extra — composite (`src/graphrefly/extra/composite.py`)

- `verifiable(source, options)` — wraps a node with verification tracking (checks, assertions)
- `distill(raw, extract, options)` — memory-efficient extraction: compacts raw data into a budget-bounded summary

### extra — backpressure (`src/graphrefly/extra/backpressure.py`)

- `create_watermark_controller(send_up, options)` — creates a PAUSE/RESUME watermark controller for flow control

### patterns — orchestration (`src/graphrefly/patterns/orchestration.py`)

- `pipeline(name, opts=None)` — creates an orchestration pipeline Graph
- `task(name, fn, opts=None)` — adds a processing step to a pipeline
- `branch(name, condition, opts=None)` — conditional routing in a pipeline
- `gate(name, signal, opts=None)` — pauses pipeline flow until signal is truthy
- `approval(name, opts=None)` — human-in-the-loop approval gate
- `for_each(name, fn)` — iterate over pipeline items
- `join(name, *sources)` — merge multiple pipeline branches
- `loop(name, condition, opts=None)` — repeat pipeline steps while condition holds
- `sub_pipeline(name, parent)` — nest a child pipeline
- `sensor(name, fn)` — monitoring node in a pipeline
- `wait(name, fn)` — async wait step
- `on_failure(name, fn)` — error handler for pipeline failures

### patterns — messaging (`src/graphrefly/patterns/messaging.py`)

- `topic(name, opts=None)` — creates a pub/sub topic Graph
- `subscription(topic, name, opts=None)` — subscribes to a topic
- `job_queue(name, opts=None)` — creates a work queue Graph with workers
- `job_flow(name, opts=None)` — creates a job flow Graph with stages
- `topic_bridge(from_topic, to_topic, opts=None)` — bridges two topics with optional transform

### patterns — memory (`src/graphrefly/patterns/memory.py`)

- `collection(name, opts=None)` — reactive document collection with CRUD operations
- `light_collection(opts=None)` — lightweight in-memory collection bundle
- `vector_index(opts=None)` — reactive vector similarity index
- `knowledge_graph(opts=None)` — reactive knowledge graph with entities and relations
- `decay(source, opts)` — time-based value decay

### patterns — AI (`src/graphrefly/patterns/ai.py`)

- `from_llm(opts)` — wraps an LLM API call as a reactive node
- `chat_stream(name, opts=None)` — creates a chat-style streaming Graph
- `tool_registry(name, opts=None)` — reactive tool/function registry for agents
- `system_prompt_builder(opts)` — reactive system prompt composition
- `llm_extractor(opts)` — LLM-powered extraction pipeline
- `llm_consolidator(opts)` — LLM-powered consolidation pipeline
- `admission_filter_3d(opts)` — 3-dimensional admission filter (novelty, salience, relevance)
- `agent_memory(opts)` — reactive agent memory with retrieval and consolidation
- `agent_loop(name, opts)` — full agent loop Graph (observe, think, act, memory)
- `knobs_as_tools(graph, actor=None)` — exposes graph knobs as LLM-callable tools
- `gauges_as_context(graph, opts=None)` — formats graph gauges as LLM context
- `validate_graph_def(definition)` — validates a graph definition object
- `graph_from_spec(spec)` — builds a Graph from a declarative spec dict
- `suggest_strategy(opts)` — LLM-powered strategy suggestion

### patterns — CQRS (`src/graphrefly/patterns/cqrs.py`)

- `cqrs(name, opts=None)` — creates a CQRS Graph (commands, events, projections, sagas)

### compat — async runners (`src/graphrefly/compat/`)

- `AsyncioRunner` — runner backed by asyncio event loop
- `TrioRunner` — runner backed by Trio nursery

### integrations — FastAPI (`src/graphrefly/integrations/fastapi.py`)

- `graphrefly_lifespan(app, **opts)` — ASGI lifespan context manager for graph lifecycle
- `get_graph(name="default")` — FastAPI dependency to retrieve a named Graph
- `sse_response(source, **opts)` — returns a streaming SSE response from a node
- `ObserveGateway` — WebSocket gateway for real-time graph observation
- `graphrefly_router(**opts)` — returns a FastAPI APIRouter with graph REST/SSE/WS endpoints

## Design invariants (spec section 5.8-5.12)

- No polling — use reactive timer sources, never busy-wait on node values
- No imperative triggers — all coordination via reactive signals and message flow
- No raw awaits in node functions — async boundaries belong in sources and runner layer
- Central timer (`core/clock.py`) and `message_tier` utilities — never hardcode type checks
- Phase 4+ APIs are developer-friendly — protocol internals never surface in primary APIs
