Lens Graph Explorer

Build a network of related types connected by lenses, visualize the graph, and trace data through multi-hop conversions

A lens maps one sample type to another while preserving a round-trip guarantee. When you chain several lenses together, you get a lens graph — a network of interconvertible types.

This example models a sensor data pipeline with four types and three lenses, then visualizes the transformation network and traces data through it.

1 — The type hierarchy

flowchart LR
    A["<b>RawSensor</b><br/><small>voltage &middot; timestamp<br/>sensor_id &middot; location</small>"]
    B["<b>Calibrated</b><br/><small>celsius &middot; timestamp<br/>sensor_id &middot; location<br/>cal_factor</small>"]
    C["<b>Features</b><br/><small>celsius &middot; hour<br/>is_daytime &middot; sensor_id</small>"]
    D["<b>Prediction</b><br/><small>classification<br/>sensor_id</small>"]
    A -->|calibrate| B
    B -->|featurize| C
    C -->|classify| D
    style A fill:#e1f5fe,stroke:#0288d1
    style B fill:#fff9c4,stroke:#f9a825
    style C fill:#f3e5f5,stroke:#7b1fa2
    style D fill:#c8e6c9,stroke:#388e3c

Raw voltage readings get calibrated into Celsius, then distilled into time-of-day features, and finally classified. Each arrow is a lens with a well-behaved round-trip.

2 — Define the four sample types

import numpy as np
import atdata


@atdata.packable
class RawSensor:
    """Raw sensor observation."""
    voltage: float
    timestamp: str
    sensor_id: str
    location: str


@atdata.packable
class Calibrated:
    """Sensor reading after calibration to Celsius."""
    celsius: float
    timestamp: str
    sensor_id: str
    location: str
    cal_factor: float


@atdata.packable
class Features:
    """Derived features for ML."""
    celsius: float
    hour: int
    is_daytime: bool
    sensor_id: str


@atdata.packable
class Prediction:
    """Classification output."""
    classification: str
    sensor_id: str

3 — Register the lens chain

Each lens defines a getter (forward transform) and a putter (reverse transform that preserves untouched fields).

CAL_FACTOR = 0.489  # voltage-to-celsius constant


@atdata.lens
def calibrate(raw: RawSensor) -> Calibrated:
    return Calibrated(
        celsius=raw.voltage * CAL_FACTOR,
        timestamp=raw.timestamp,
        sensor_id=raw.sensor_id,
        location=raw.location,
        cal_factor=CAL_FACTOR,
    )


@calibrate.putter
def calibrate_put(view: Calibrated, source: RawSensor) -> RawSensor:
    return RawSensor(
        voltage=view.celsius / view.cal_factor,
        timestamp=view.timestamp,
        sensor_id=view.sensor_id,
        location=view.location,
    )


@atdata.lens
def featurize(cal: Calibrated) -> Features:
    hour = int(cal.timestamp.split("T")[1].split(":")[0])
    return Features(
        celsius=cal.celsius,
        hour=hour,
        is_daytime=(6 <= hour < 20),
        sensor_id=cal.sensor_id,
    )


@featurize.putter
def featurize_put(view: Features, source: Calibrated) -> Calibrated:
    return Calibrated(
        celsius=view.celsius,
        timestamp=source.timestamp,
        sensor_id=view.sensor_id,
        location=source.location,
        cal_factor=source.cal_factor,
    )


@atdata.lens
def classify(feat: Features) -> Prediction:
    if feat.celsius >= 35:
        label = "hot"
    elif feat.celsius >= 15:
        label = "warm"
    else:
        label = "cold"
    return Prediction(classification=label, sensor_id=feat.sensor_id)


@classify.putter
def classify_put(view: Prediction, source: Features) -> Features:
    return Features(
        celsius=source.celsius,
        hour=source.hour,
        is_daytime=source.is_daytime,
        sensor_id=view.sensor_id,
    )


print("Registered 3 lenses: calibrate, featurize, classify")
Registered 3 lenses: calibrate, featurize, classify

4 — Trace a single sample through the chain

raw = RawSensor(
    voltage=72.5,
    timestamp="2024-07-15T14:30:00Z",
    sensor_id="sensor_03",
    location="rooftop-north",
)

cal = calibrate.get(raw)

print("INPUT  (RawSensor):")
print(f"  voltage    = {raw.voltage}")
print(f"  timestamp  = {raw.timestamp}")
print(f"  sensor_id  = {raw.sensor_id}")
print(f"  location   = {raw.location}")
print()
print("OUTPUT (Calibrated):")
print(f"  celsius    = {cal.celsius:.2f}")
print(f"  timestamp  = {cal.timestamp}")
print(f"  sensor_id  = {cal.sensor_id}")
print(f"  location   = {cal.location}")
print(f"  cal_factor = {cal.cal_factor}")
INPUT  (RawSensor):
  voltage    = 72.5
  timestamp  = 2024-07-15T14:30:00Z
  sensor_id  = sensor_03
  location   = rooftop-north

OUTPUT (Calibrated):
  celsius    = 35.45
  timestamp  = 2024-07-15T14:30:00Z
  sensor_id  = sensor_03
  location   = rooftop-north
  cal_factor = 0.489
feat = featurize.get(cal)

print("INPUT  (Calibrated):")
print(f"  celsius    = {cal.celsius:.2f}")
print(f"  timestamp  = {cal.timestamp}")
print()
print("OUTPUT (Features):")
print(f"  celsius    = {feat.celsius:.2f}")
print(f"  hour       = {feat.hour}")
print(f"  is_daytime = {feat.is_daytime}")
print(f"  sensor_id  = {feat.sensor_id}")
INPUT  (Calibrated):
  celsius    = 35.45
  timestamp  = 2024-07-15T14:30:00Z

OUTPUT (Features):
  celsius    = 35.45
  hour       = 14
  is_daytime = True
  sensor_id  = sensor_03
pred = classify.get(feat)

print("INPUT  (Features):")
print(f"  celsius    = {feat.celsius:.2f}")
print(f"  hour       = {feat.hour}")
print(f"  is_daytime = {feat.is_daytime}")
print()
print("OUTPUT (Prediction):")
print(f"  classification = {pred.classification}")
print(f"  sensor_id      = {pred.sensor_id}")
INPUT  (Features):
  celsius    = 35.45
  hour       = 14
  is_daytime = True

OUTPUT (Prediction):
  classification = hot
  sensor_id      = sensor_03

5 — Batch transformation on a dataset

Write a dataset of RawSensor readings and iterate as Calibrated using ds.as_type().

import tempfile
from pathlib import Path

tmpdir = Path(tempfile.mkdtemp(prefix="atdata_lens_graph_"))
rng = np.random.default_rng(42)

raw_samples = [
    RawSensor(
        voltage=round(float(rng.uniform(20, 90)), 2),
        timestamp=f"2024-08-{10 + i % 20:02d}T{rng.integers(0, 24):02d}:{rng.integers(0, 60):02d}:00Z",
        sensor_id=f"sensor_{i % 5:02d}",
        location=rng.choice(["rooftop", "basement", "outdoor", "lab"]),
    )
    for i in range(500)
]

ds = atdata.write_samples(raw_samples, tmpdir / "sensors.tar", maxcount=250)
print(f"Wrote {len(raw_samples)} RawSensor samples across {len(ds.list_shards())} shards")
# writing /var/folders/hx/9l078dds5z945qcv8j1hsnr00000gn/T/atdata_lens_graph__bva6ddu/sensors-000000.tar 0 0.0 GB 0
# writing /var/folders/hx/9l078dds5z945qcv8j1hsnr00000gn/T/atdata_lens_graph__bva6ddu/sensors-000001.tar 250 0.0 GB 250
Wrote 500 RawSensor samples across 2 shards
ds_cal = ds.as_type(Calibrated)

batch = next(iter(ds_cal.ordered(batch_size=8)))
print(f"Batch sample type: Calibrated")
print(f"  celsius values:    {[round(c, 2) for c in batch.celsius[:4]]} ...")
print(f"  cal_factor values: {batch.cal_factor[:4]} ...")
print(f"  sensor_ids:        {batch.sensor_id[:4]} ...")
Batch sample type: Calibrated
  celsius values:    [36.27, 33.65, 43.17, 14.17] ...
  cal_factor values: [0.489, 0.489, 0.489, 0.489] ...
  sensor_ids:        ['sensor_00', 'sensor_01', 'sensor_02', 'sensor_03'] ...

6 — Verify lens laws

Well-behaved lenses satisfy two laws:

flowchart TB
    subgraph gp ["GetPut: put(get(s), s) == s"]
        direction LR
        S1["source"] -->|get| V1["view"]
        V1 -->|"put(view, source)"| S2["source'"]
    end
    subgraph pg ["PutGet: get(put(v, s)) == v"]
        direction LR
        V2["view"] -->|"put(view, source)"| S3["source'"]
        S3 -->|get| V3["view'"]
    end
    style gp fill:#e8f5e9,stroke:#388e3c
    style pg fill:#e3f2fd,stroke:#1565c0

# GetPut: put(get(source), source) == source
raw_roundtrip = calibrate.put(calibrate.get(raw), raw)
assert raw_roundtrip.voltage == raw.voltage
assert raw_roundtrip.timestamp == raw.timestamp
assert raw_roundtrip.sensor_id == raw.sensor_id
assert raw_roundtrip.location == raw.location
print("GetPut (calibrate): PASS")

cal_roundtrip = featurize.put(featurize.get(cal), cal)
assert cal_roundtrip.celsius == cal.celsius
assert cal_roundtrip.sensor_id == cal.sensor_id
assert cal_roundtrip.location == cal.location
print("GetPut (featurize): PASS")

feat_roundtrip = classify.put(classify.get(feat), feat)
assert feat_roundtrip.celsius == feat.celsius
assert feat_roundtrip.hour == feat.hour
assert feat_roundtrip.is_daytime == feat.is_daytime
print("GetPut (classify):  PASS")
GetPut (calibrate): PASS
GetPut (featurize): PASS
GetPut (classify):  PASS
# PutGet: get(put(view, source)) == view
modified_cal = Calibrated(
    celsius=25.0, timestamp=raw.timestamp,
    sensor_id=raw.sensor_id, location=raw.location, cal_factor=CAL_FACTOR,
)
assert calibrate.get(calibrate.put(modified_cal, raw)).celsius == modified_cal.celsius
print("PutGet (calibrate): PASS")

modified_feat = Features(celsius=25.0, hour=14, is_daytime=True, sensor_id="sensor_03")
result = featurize.get(featurize.put(modified_feat, cal))
assert result.celsius == modified_feat.celsius
assert result.sensor_id == modified_feat.sensor_id
print("PutGet (featurize): PASS")

modified_pred = Prediction(classification="hot", sensor_id="sensor_03")
assert classify.get(classify.put(modified_pred, feat)).classification == modified_pred.classification
print("PutGet (classify):  PASS")
PutGet (calibrate): PASS
PutGet (featurize): PASS
PutGet (classify):  PASS
What are lens laws?

GetPut guarantees that if you read a view and write it back unchanged, the source is unchanged. PutGet guarantees that if you write a view and read it back, you get the same view. Together they ensure lenses are information-preserving round-trips.

Note that classify is a lossy lens: the putter preserves the source’s celsius value, so PutGet only holds when the classification is consistent with the source temperature. This is a common pattern for summary/derived lenses.

7 — Inspect the LensNetwork

All lenses registered with @atdata.lens are tracked in a global registry.

network = atdata.LensNetwork()

print("Registered lenses:")
for (src, tgt), lens_obj in network._registry.items():
    print(f"  {src.__name__:15s} -> {tgt.__name__}")
Registered lenses:
  DictSample      -> RawSensor
  DictSample      -> Calibrated
  DictSample      -> Features
  DictSample      -> Prediction
  RawSensor       -> Calibrated
  Calibrated      -> Features
  Features        -> Prediction

8 — Clean up

import shutil

shutil.rmtree(tmpdir, ignore_errors=True)

Key takeaways

Concept API
Define a forward transform @atdata.lens decorator on getter function
Define a reverse transform @my_lens.putter decorator
Apply lens to a single sample my_lens.get(source) / my_lens.put(view, source)
Transform an entire dataset ds.as_type(TargetType)
Inspect registered lenses LensNetwork()._registry
Verify round-trip correctness Assert GetPut and PutGet laws