import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.atmosphere import (
Atmosphere,
PDSBlobStore,
SchemaPublisher,
SchemaLoader,
DatasetPublisher,
DatasetLoader,
AtUri,
)
from atdata import BlobSourceAtmosphere Publishing
This tutorial demonstrates how to use the atmosphere module to publish datasets to the AT Protocol network, enabling federated discovery and sharing. This is Layer 3 of atdata’s architecture—decentralized federation that enables cross-organization dataset sharing.
Why Federation?
Managed storage (Index with SQLite/Redis + local disk/S3) works well within an organization, but sharing across organizations introduces new challenges:
- Discovery: How do researchers find relevant datasets across institutions?
- Trust: How do you verify a dataset is what it claims to be?
- Durability: What happens if the original publisher goes offline?
The AT Protocol (ATProto), developed by Bluesky, provides a foundation for decentralized social applications. atdata leverages ATProto’s infrastructure for dataset federation:
| ATProto Feature | atdata Usage |
|---|---|
| DIDs (Decentralized Identifiers) | Publisher identity verification |
| Lexicons | Dataset/schema record schemas |
| PDSes (Personal Data Servers) | Storage for records and blobs |
| Relays & AppViews | Discovery and aggregation |
The key insight: your Bluesky identity (@handle.bsky.social) becomes your dataset publisher identity. Anyone can verify that a dataset was published by you, and can discover your datasets through the federated network.
Prerequisites
pip install atdata[atmosphere]- A Bluesky account with an app-specific password
Always use an app-specific password, not your main Bluesky password.
Setup
Define Sample Types
@atdata.packable
class ImageSample:
"""A sample containing image data with metadata."""
image: NDArray
label: str
confidence: float
@atdata.packable
class TextEmbeddingSample:
"""A sample containing text with embedding vectors."""
text: str
embedding: NDArray
source: strType Introspection
See what information is available from a PackableSample type:
from dataclasses import fields, is_dataclass
print(f"Sample type: {ImageSample.__name__}")
print(f"Is dataclass: {is_dataclass(ImageSample)}")
print("\nFields:")
for field in fields(ImageSample):
print(f" - {field.name}: {field.type}")
# Create and serialize a sample
sample = ImageSample(
image=np.random.rand(224, 224, 3).astype(np.float32),
label="cat",
confidence=0.95,
)
packed = sample.packed
print(f"\nSerialized size: {len(packed):,} bytes")
# Round-trip
restored = ImageSample.from_bytes(packed)
print(f"Round-trip successful: {np.allclose(sample.image, restored.image)}")AT URI Parsing
Every record in ATProto is identified by an AT URI, which encodes:
- Authority: The DID or handle of the record owner
- Collection: The Lexicon type (like a table name)
- Rkey: The record key (unique within the collection)
Understanding AT URIs is essential for working with atmosphere datasets, as they’re how you reference schemas, datasets, and lenses.
ATProto records are identified by AT URIs:
uris = [
"at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz789",
"at://alice.bsky.social/ac.foundation.dataset.record/my-dataset",
]
for uri_str in uris:
print(f"\nParsing: {uri_str}")
uri = AtUri.parse(uri_str)
print(f" Authority: {uri.authority}")
print(f" Collection: {uri.collection}")
print(f" Rkey: {uri.rkey}")Authentication
The Atmosphere class handles ATProto authentication. When you authenticate, you’re proving ownership of your decentralized identity (DID), which gives you permission to create and modify records in your Personal Data Server (PDS).
Connect to ATProto:
# Factory classmethod — returns an authenticated client
client = Atmosphere.login("your.handle.social", "your-app-password")
# Or from environment variables (ATDATA_HANDLE, ATDATA_PASSWORD)
client = Atmosphere.from_env()
print(f"Authenticated as: {client.handle}")
print(f"DID: {client.did}")Publish a Schema
When you publish a schema to ATProto, it becomes a public, immutable record that others can reference. The schema CID ensures that anyone can verify they’re using exactly the same type definition you published.
schema_publisher = SchemaPublisher(client)
schema_uri = schema_publisher.publish(
ImageSample,
name="ImageSample",
version="1.0.0",
description="Demo: Image sample with label and confidence",
)
print(f"Schema URI: {schema_uri}")List Your Schemas
schema_loader = SchemaLoader(client)
schemas = schema_loader.list_all(limit=10)
print(f"Found {len(schemas)} schema(s)")
for schema in schemas:
print(f" - {schema.get('name', 'Unknown')}: v{schema.get('version', '?')}")Publish a Dataset
With External URLs
dataset_publisher = DatasetPublisher(client)
dataset_uri = dataset_publisher.publish_with_urls(
urls=["s3://example-bucket/demo-data-{000000..000009}.tar"],
schema_uri=str(schema_uri),
name="Demo Image Dataset",
description="Example dataset demonstrating atmosphere publishing",
tags=["demo", "images", "atdata"],
license="MIT",
)
print(f"Dataset URI: {dataset_uri}")With PDS Blob Storage (Recommended)
The PDSBlobStore is the fully decentralized option: your dataset shards are stored as ATProto blobs directly in your PDS, alongside your other ATProto records. This means:
- No external dependencies: Data lives in the same infrastructure as your identity
- Content-addressed: Blobs are identified by their CID, ensuring integrity
- Federated replication: Relays can mirror your blobs for availability
For fully decentralized storage, use PDSBlobStore with the unified Index:
# Create index with blob storage and atmosphere backend
store = PDSBlobStore(client)
index = atdata.Index(atmosphere=client, data_store=store)
# Define sample type
@atdata.packable
class FeatureSample:
features: NDArray
label: int
# Create and write samples
samples = [FeatureSample(features=np.random.randn(64).astype(np.float32), label=i % 10) for i in range(100)]
ds = atdata.write_samples(samples, "temp.tar")
# Publish - shards are uploaded as blobs automatically
schema_uri = index.publish_schema(FeatureSample, version="1.0.0")
entry = index.insert_dataset(
ds,
name="blob-stored-features",
schema_ref=schema_uri,
description="Features stored as PDS blobs",
)
print(f"Dataset URI: {entry.uri}")
print(f"Blob URLs: {entry.data_urls}") # at://did/blob/cid formatUse BlobSource to stream directly from PDS blobs:
# Create source from the blob URLs
source = store.create_source(entry.data_urls)
# Or manually from blob references
source = BlobSource.from_refs([
{"did": client.did, "cid": "bafyrei..."},
])
# Load and iterate
ds = atdata.Dataset[FeatureSample](source)
for batch in ds.ordered(batch_size=32):
print(batch.features.shape)With External URLs
For larger datasets that exceed PDS blob limits, or when you already have data in object storage, you can publish a dataset record that references external URLs. The ATProto record serves as the index entry while the actual data lives elsewhere.
For larger datasets or when using existing object storage:
dataset_publisher = DatasetPublisher(client)
dataset_uri = dataset_publisher.publish_with_urls(
urls=["s3://example-bucket/demo-data-{000000..000009}.tar"],
schema_uri=str(schema_uri),
name="Demo Image Dataset",
description="Example dataset demonstrating atmosphere publishing",
tags=["demo", "images", "atdata"],
license="MIT",
)
print(f"Dataset URI: {dataset_uri}")List and Load Datasets
dataset_loader = DatasetLoader(client)
datasets = dataset_loader.list_all(limit=10)
print(f"Found {len(datasets)} dataset(s)")
for ds in datasets:
print(f" - {ds.get('name', 'Unknown')}")
print(f" Schema: {ds.get('schemaRef', 'N/A')}")
tags = ds.get('tags', [])
if tags:
print(f" Tags: {', '.join(tags)}")Load a Dataset
# Check storage type
storage_type = dataset_loader.get_storage_type(str(blob_dataset_uri))
print(f"Storage type: {storage_type}")
if storage_type == "blobs":
blob_urls = dataset_loader.get_blob_urls(str(blob_dataset_uri))
print(f"Blob URLs: {len(blob_urls)} blob(s)")
# Load and iterate (works for both storage types)
ds = dataset_loader.to_dataset(str(blob_dataset_uri), DemoSample)
for batch in ds.ordered():
print(f"Sample id={batch.id}, text={batch.text}")Complete Publishing Workflow
Here’s the end-to-end workflow for publishing a dataset to the atmosphere:
- Define your sample type using
@packable - Write samples with
write_samples() - Authenticate with your ATProto identity
- Create index with blob storage (
Index+PDSBlobStore) - Publish schema and insert dataset
Notice how similar this is to the local workflow—the same sample types and patterns, just with a different storage backend.
# 1. Define and create samples
@atdata.packable
class FeatureSample:
features: NDArray
label: int
source: str
samples = [
FeatureSample(
features=np.random.randn(128).astype(np.float32),
label=i % 10,
source="synthetic",
)
for i in range(1000)
]
# 2. Write to tar
ds = atdata.write_samples(samples, "features.tar", maxcount=500)
# 3. Authenticate and create index with blob storage
client = Atmosphere.login("myhandle.bsky.social", "app-password")
store = PDSBlobStore(client)
index = atdata.Index(atmosphere=client, data_store=store)
# 4. Publish schema
schema_uri = index.publish_schema(
FeatureSample,
version="1.0.0",
description="Feature vectors with labels",
)
# 5. Publish dataset (shards uploaded as blobs automatically)
entry = index.insert_dataset(
ds,
name="synthetic-features-v1",
schema_ref=schema_uri,
tags=["features", "synthetic"],
)
print(f"Published: {entry.uri}")
print(f"Data stored at: {entry.data_urls}") # at://did/blob/cid URLs
# 6. Later: load from blobs
source = store.create_source(entry.data_urls)
loaded = atdata.Dataset[FeatureSample](source)
for batch in loaded.ordered(batch_size=32):
print(f"Loaded batch with {len(batch.label)} samples")
breakWhat You’ve Learned
You now understand federated dataset publishing in atdata:
| Concept | Purpose |
|---|---|
Atmosphere |
ATProto authentication and record management |
Index(atmosphere=client) |
Unified index with atmosphere backend |
PDSBlobStore |
PDS blob storage implementing AbstractDataStore |
BlobSource |
Stream datasets from PDS blobs |
| AT URIs | Universal identifiers for schemas and datasets |
The protocol abstractions (AbstractIndex, AbstractDataStore, DataSource) ensure your code works across all three layers of atdata—local files, team storage, and federated sharing.
The Full Picture
You’ve now seen atdata’s complete architecture:
Local Development Managed Storage Federation
───────────────── ─────────────── ──────────
tar files Index + SQLite/Redis Index + Atmosphere
write_samples() LocalDiskStore / S3 PDSBlobStore
Dataset[T] load_dataset("@local/") load_dataset("@handle/")
The same @packable sample types, the same Dataset[T] iteration patterns, and the same lens transformations work at every layer. Only the storage backend changes.
Next Steps
The Promotion Workflow tutorial shows how to migrate existing datasets from local storage to the atmosphere without re-processing your data.
- Promotion Workflow - Migrate from local storage to atmosphere
- Atmosphere Reference - Complete API reference
- Protocols - Abstract interfaces