Local Workflow

Store and manage datasets with Index

This tutorial demonstrates how to use the Index class to store and manage datasets locally. This is Layer 2 of atdata’s architecture—managed storage that bridges local development and federated sharing.

Why Managed Storage?

Local tar files work well for individual experiments, but growing projects need:

  • Discovery: “What datasets do we have? What schema does this one use?”
  • Consistency: “Is everyone using the same version of this dataset?”
  • Durability: “Where’s the canonical copy of our training data?”

atdata’s Index class addresses these needs with a two-component architecture:

Component Purpose
Index Provider Metadata queries, schema registry, dataset discovery (SQLite default, also Redis/PostgreSQL)
Data Store Persistent shard storage (LocalDiskStore or S3DataStore)

This separation means metadata operations (listing datasets, resolving schemas) are fast and don’t touch large data files, while the data itself lives in persistent storage.

Prerequisites

For the simplest setup, no external services are needed—Index() uses SQLite and LocalDiskStore by default.

For team-scale deployments you can optionally use:

  • Redis or PostgreSQL as the index provider
  • S3-compatible storage (MinIO, AWS S3, etc.) as the data store

Setup

import numpy as np
from numpy.typing import NDArray
import atdata

Define Sample Types

@atdata.packable
class TrainingSample:
    """A sample containing features and label for training."""
    features: NDArray
    label: int

@atdata.packable
class TextSample:
    """A sample containing text data."""
    text: str
    category: str

LocalDatasetEntry

Every dataset in the index is represented by a LocalDatasetEntry. A key design decision: entries use content-addressable CIDs (Content Identifiers) as their identity. This means:

  • Identical content always has the same CID
  • You can verify data integrity by checking the CID
  • Deduplication happens automatically

CIDs are computed from the entry’s schema reference and data URLs, so the same logical dataset will have the same CID regardless of where it’s stored.

Create entries with content-addressable CIDs:

# Create an entry manually
entry = LocalDatasetEntry(
    _name="my-dataset",
    _schema_ref="local://schemas/examples.TrainingSample@1.0.0",
    _data_urls=["s3://bucket/data-000000.tar", "s3://bucket/data-000001.tar"],
    _metadata={"source": "example", "samples": 10000},
)

print(f"Entry name: {entry.name}")
print(f"Schema ref: {entry.schema_ref}")
print(f"Data URLs: {entry.data_urls}")
print(f"Metadata: {entry.metadata}")
print(f"CID: {entry.cid}")
Note

CIDs are generated from content (schema_ref + data_urls), so identical data produces identical CIDs.

The Index

The Index is your dataset registry. It implements the AbstractIndex protocol, meaning code written against Index will also work with atmosphere backends when you’re ready for federated sharing.

By default, Index() uses SQLite (zero external dependencies):

# Zero-config: SQLite in memory
index = atdata.Index()

# Or persist to disk
index = atdata.Index(path="~/.atdata/index.db")

# With a data store for shard storage
index = atdata.Index(
    data_store=atdata.LocalDiskStore(root="~/.atdata/data/"),
)

For team deployments, swap in Redis or PostgreSQL:

from redis import Redis

index = atdata.Index(redis=Redis(host="localhost", port=6379))
# or
index = atdata.Index(provider="postgres", dsn="postgresql://user:pass@host/db")

Schema Management

Schema publishing is how you ensure type consistency across your team. When you publish a schema, atdata stores the complete type definition (field names, types, metadata) so anyone can reconstruct the Python class from just the schema reference.

When you use index.write(), schemas are persisted automatically—no separate publish_schema() call needed. Consumers can load datasets by name and the sample type is reconstructed from the stored schema.

# Explicit schema publishing (optional — index.write() does this for you)
schema_ref = index.publish_schema(TrainingSample, version="1.0.0")
print(f"Published schema: {schema_ref}")

# List all schemas
for schema in index.list_schemas():
    print(f"  - {schema.get('name', 'Unknown')} v{schema.get('version', '?')}")

# Decode schema back to a PackableSample class
decoded_type = index.decode_schema(schema_ref)
print(f"Decoded type: {decoded_type.__name__}")

Data Stores

Data stores implement the AbstractDataStore protocol for persistent shard storage:

LocalDiskStore

The simplest option—stores shards on the local filesystem:

store = atdata.LocalDiskStore(root="~/.atdata/data/")

# Or accept the default root (~/.atdata/data/)
store = atdata.LocalDiskStore()

S3DataStore

For team-scale storage with S3-compatible backends (AWS S3, MinIO, Cloudflare R2):

from atdata.stores import S3DataStore

store = S3DataStore(
    credentials={
        "AWS_ENDPOINT": "http://localhost:9000",
        "AWS_ACCESS_KEY_ID": "minioadmin",
        "AWS_SECRET_ACCESS_KEY": "minioadmin",
    },
    bucket="my-bucket",
)

Complete Index Workflow

The typical workflow has just three steps:

  1. Create samples using your @packable type
  2. Write through the index (index.write() handles sharding, storage, schema persistence, and indexing)
  3. Load by name (load_dataset() auto-resolves schema and data URLs)
# 1. Create sample data
samples = [
    TrainingSample(
        features=np.random.randn(128).astype(np.float32),
        label=i % 10
    )
    for i in range(1000)
]

# 2. Write through the index (schema is persisted automatically)
index = atdata.Index(data_store=atdata.LocalDiskStore())
entry = index.write(samples, name="training-v1", maxcount=500)
print(f"CID: {entry.cid}")
print(f"Shards: {entry.data_urls}")

# 3. Load by name — no sample_type needed
atdata.set_default_index(index)
ds = atdata.load_dataset("@local/training-v1", split="train")

for batch in ds.ordered(batch_size=32):
    print(f"Batch features shape: {batch.features.shape}")
    break

For S3-backed team storage, just swap the data store:

from atdata.stores import S3DataStore

store = S3DataStore(
    credentials={
        "AWS_ENDPOINT": "http://localhost:9000",
        "AWS_ACCESS_KEY_ID": "minioadmin",
        "AWS_SECRET_ACCESS_KEY": "minioadmin",
    },
    bucket="team-datasets",
)
index = atdata.Index(data_store=store)
entry = index.write(samples, name="training-v1")

Using load_dataset with Index

The load_dataset() function provides a HuggingFace-style API that abstracts away the details of where data lives. It resolves @local/ prefixed paths to data URLs and auto-resolves the schema so you don’t need the original Python class.

from atdata import load_dataset

# Schema auto-resolved from index — no sample_type needed
ds = load_dataset("@local/training-v1", split="train")

for batch in ds.shuffled(batch_size=32):
    print(f"Features: {batch.features.shape}")
    break

What You’ve Learned

You now understand managed storage in atdata:

Concept Purpose
Index Dataset registry with pluggable providers (SQLite, Redis, PostgreSQL)
index.write() Write samples + persist schema + create entry in one call
LocalDiskStore / S3DataStore Persistent shard storage implementing AbstractDataStore
load_dataset("@local/...") Load by name with auto-resolved schema
LocalDatasetEntry Content-addressed dataset entries with CIDs

The same sample types you defined in the Quick Start work seamlessly here—the only change is where the data lives.

Next Steps

Ready for Public Sharing?

The Atmosphere Publishing tutorial shows how to publish datasets to the ATProto network for decentralized, cross-organization discovery.