flowchart LR
A["<b>RawSensor</b><br/><small>voltage · timestamp<br/>sensor_id · location</small>"]
B["<b>Calibrated</b><br/><small>celsius · timestamp<br/>sensor_id · location<br/>cal_factor</small>"]
C["<b>Features</b><br/><small>celsius · hour<br/>is_daytime · sensor_id</small>"]
D["<b>Prediction</b><br/><small>classification<br/>sensor_id</small>"]
A -->|calibrate| B
B -->|featurize| C
C -->|classify| D
style A fill:#e1f5fe,stroke:#0288d1
style B fill:#fff9c4,stroke:#f9a825
style C fill:#f3e5f5,stroke:#7b1fa2
style D fill:#c8e6c9,stroke:#388e3c
Lens Graph Explorer
A lens maps one sample type to another while preserving a round-trip guarantee. When you chain several lenses together, you get a lens graph — a network of interconvertible types.
This example models a sensor data pipeline with four types and three lenses, then visualizes the transformation network and traces data through it.
1 — The type hierarchy
Raw voltage readings get calibrated into Celsius, then distilled into time-of-day features, and finally classified. Each arrow is a lens with a well-behaved round-trip.
2 — Define the four sample types
import numpy as np
import atdata
@atdata.packable
class RawSensor:
"""Raw sensor observation."""
voltage: float
timestamp: str
sensor_id: str
location: str
@atdata.packable
class Calibrated:
"""Sensor reading after calibration to Celsius."""
celsius: float
timestamp: str
sensor_id: str
location: str
cal_factor: float
@atdata.packable
class Features:
"""Derived features for ML."""
celsius: float
hour: int
is_daytime: bool
sensor_id: str
@atdata.packable
class Prediction:
"""Classification output."""
classification: str
sensor_id: str3 — Register the lens chain
Each lens defines a getter (forward transform) and a putter (reverse transform that preserves untouched fields).
CAL_FACTOR = 0.489 # voltage-to-celsius constant
@atdata.lens
def calibrate(raw: RawSensor) -> Calibrated:
return Calibrated(
celsius=raw.voltage * CAL_FACTOR,
timestamp=raw.timestamp,
sensor_id=raw.sensor_id,
location=raw.location,
cal_factor=CAL_FACTOR,
)
@calibrate.putter
def calibrate_put(view: Calibrated, source: RawSensor) -> RawSensor:
return RawSensor(
voltage=view.celsius / view.cal_factor,
timestamp=view.timestamp,
sensor_id=view.sensor_id,
location=view.location,
)
@atdata.lens
def featurize(cal: Calibrated) -> Features:
hour = int(cal.timestamp.split("T")[1].split(":")[0])
return Features(
celsius=cal.celsius,
hour=hour,
is_daytime=(6 <= hour < 20),
sensor_id=cal.sensor_id,
)
@featurize.putter
def featurize_put(view: Features, source: Calibrated) -> Calibrated:
return Calibrated(
celsius=view.celsius,
timestamp=source.timestamp,
sensor_id=view.sensor_id,
location=source.location,
cal_factor=source.cal_factor,
)
@atdata.lens
def classify(feat: Features) -> Prediction:
if feat.celsius >= 35:
label = "hot"
elif feat.celsius >= 15:
label = "warm"
else:
label = "cold"
return Prediction(classification=label, sensor_id=feat.sensor_id)
@classify.putter
def classify_put(view: Prediction, source: Features) -> Features:
return Features(
celsius=source.celsius,
hour=source.hour,
is_daytime=source.is_daytime,
sensor_id=view.sensor_id,
)
print("Registered 3 lenses: calibrate, featurize, classify")Registered 3 lenses: calibrate, featurize, classify
4 — Trace a single sample through the chain
raw = RawSensor(
voltage=72.5,
timestamp="2024-07-15T14:30:00Z",
sensor_id="sensor_03",
location="rooftop-north",
)
cal = calibrate.get(raw)
print("INPUT (RawSensor):")
print(f" voltage = {raw.voltage}")
print(f" timestamp = {raw.timestamp}")
print(f" sensor_id = {raw.sensor_id}")
print(f" location = {raw.location}")
print()
print("OUTPUT (Calibrated):")
print(f" celsius = {cal.celsius:.2f}")
print(f" timestamp = {cal.timestamp}")
print(f" sensor_id = {cal.sensor_id}")
print(f" location = {cal.location}")
print(f" cal_factor = {cal.cal_factor}")INPUT (RawSensor):
voltage = 72.5
timestamp = 2024-07-15T14:30:00Z
sensor_id = sensor_03
location = rooftop-north
OUTPUT (Calibrated):
celsius = 35.45
timestamp = 2024-07-15T14:30:00Z
sensor_id = sensor_03
location = rooftop-north
cal_factor = 0.489
feat = featurize.get(cal)
print("INPUT (Calibrated):")
print(f" celsius = {cal.celsius:.2f}")
print(f" timestamp = {cal.timestamp}")
print()
print("OUTPUT (Features):")
print(f" celsius = {feat.celsius:.2f}")
print(f" hour = {feat.hour}")
print(f" is_daytime = {feat.is_daytime}")
print(f" sensor_id = {feat.sensor_id}")INPUT (Calibrated):
celsius = 35.45
timestamp = 2024-07-15T14:30:00Z
OUTPUT (Features):
celsius = 35.45
hour = 14
is_daytime = True
sensor_id = sensor_03
pred = classify.get(feat)
print("INPUT (Features):")
print(f" celsius = {feat.celsius:.2f}")
print(f" hour = {feat.hour}")
print(f" is_daytime = {feat.is_daytime}")
print()
print("OUTPUT (Prediction):")
print(f" classification = {pred.classification}")
print(f" sensor_id = {pred.sensor_id}")INPUT (Features):
celsius = 35.45
hour = 14
is_daytime = True
OUTPUT (Prediction):
classification = hot
sensor_id = sensor_03
5 — Batch transformation on a dataset
Write a dataset of RawSensor readings and iterate as Calibrated using ds.as_type().
import tempfile
from pathlib import Path
tmpdir = Path(tempfile.mkdtemp(prefix="atdata_lens_graph_"))
rng = np.random.default_rng(42)
raw_samples = [
RawSensor(
voltage=round(float(rng.uniform(20, 90)), 2),
timestamp=f"2024-08-{10 + i % 20:02d}T{rng.integers(0, 24):02d}:{rng.integers(0, 60):02d}:00Z",
sensor_id=f"sensor_{i % 5:02d}",
location=rng.choice(["rooftop", "basement", "outdoor", "lab"]),
)
for i in range(500)
]
ds = atdata.write_samples(raw_samples, tmpdir / "sensors.tar", maxcount=250)
print(f"Wrote {len(raw_samples)} RawSensor samples across {len(ds.list_shards())} shards")# writing /var/folders/hx/9l078dds5z945qcv8j1hsnr00000gn/T/atdata_lens_graph__bva6ddu/sensors-000000.tar 0 0.0 GB 0
# writing /var/folders/hx/9l078dds5z945qcv8j1hsnr00000gn/T/atdata_lens_graph__bva6ddu/sensors-000001.tar 250 0.0 GB 250
Wrote 500 RawSensor samples across 2 shards
ds_cal = ds.as_type(Calibrated)
batch = next(iter(ds_cal.ordered(batch_size=8)))
print(f"Batch sample type: Calibrated")
print(f" celsius values: {[round(c, 2) for c in batch.celsius[:4]]} ...")
print(f" cal_factor values: {batch.cal_factor[:4]} ...")
print(f" sensor_ids: {batch.sensor_id[:4]} ...")Batch sample type: Calibrated
celsius values: [36.27, 33.65, 43.17, 14.17] ...
cal_factor values: [0.489, 0.489, 0.489, 0.489] ...
sensor_ids: ['sensor_00', 'sensor_01', 'sensor_02', 'sensor_03'] ...
6 — Verify lens laws
Well-behaved lenses satisfy two laws:
flowchart TB
subgraph gp ["GetPut: put(get(s), s) == s"]
direction LR
S1["source"] -->|get| V1["view"]
V1 -->|"put(view, source)"| S2["source'"]
end
subgraph pg ["PutGet: get(put(v, s)) == v"]
direction LR
V2["view"] -->|"put(view, source)"| S3["source'"]
S3 -->|get| V3["view'"]
end
style gp fill:#e8f5e9,stroke:#388e3c
style pg fill:#e3f2fd,stroke:#1565c0
# GetPut: put(get(source), source) == source
raw_roundtrip = calibrate.put(calibrate.get(raw), raw)
assert raw_roundtrip.voltage == raw.voltage
assert raw_roundtrip.timestamp == raw.timestamp
assert raw_roundtrip.sensor_id == raw.sensor_id
assert raw_roundtrip.location == raw.location
print("GetPut (calibrate): PASS")
cal_roundtrip = featurize.put(featurize.get(cal), cal)
assert cal_roundtrip.celsius == cal.celsius
assert cal_roundtrip.sensor_id == cal.sensor_id
assert cal_roundtrip.location == cal.location
print("GetPut (featurize): PASS")
feat_roundtrip = classify.put(classify.get(feat), feat)
assert feat_roundtrip.celsius == feat.celsius
assert feat_roundtrip.hour == feat.hour
assert feat_roundtrip.is_daytime == feat.is_daytime
print("GetPut (classify): PASS")GetPut (calibrate): PASS
GetPut (featurize): PASS
GetPut (classify): PASS
# PutGet: get(put(view, source)) == view
modified_cal = Calibrated(
celsius=25.0, timestamp=raw.timestamp,
sensor_id=raw.sensor_id, location=raw.location, cal_factor=CAL_FACTOR,
)
assert calibrate.get(calibrate.put(modified_cal, raw)).celsius == modified_cal.celsius
print("PutGet (calibrate): PASS")
modified_feat = Features(celsius=25.0, hour=14, is_daytime=True, sensor_id="sensor_03")
result = featurize.get(featurize.put(modified_feat, cal))
assert result.celsius == modified_feat.celsius
assert result.sensor_id == modified_feat.sensor_id
print("PutGet (featurize): PASS")
modified_pred = Prediction(classification="hot", sensor_id="sensor_03")
assert classify.get(classify.put(modified_pred, feat)).classification == modified_pred.classification
print("PutGet (classify): PASS")PutGet (calibrate): PASS
PutGet (featurize): PASS
PutGet (classify): PASS
GetPut guarantees that if you read a view and write it back unchanged, the source is unchanged. PutGet guarantees that if you write a view and read it back, you get the same view. Together they ensure lenses are information-preserving round-trips.
Note that classify is a lossy lens: the putter preserves the source’s celsius value, so PutGet only holds when the classification is consistent with the source temperature. This is a common pattern for summary/derived lenses.
7 — Inspect the LensNetwork
All lenses registered with @atdata.lens are tracked in a global registry.
network = atdata.LensNetwork()
print("Registered lenses:")
for (src, tgt), lens_obj in network._registry.items():
print(f" {src.__name__:15s} -> {tgt.__name__}")Registered lenses:
DictSample -> RawSensor
DictSample -> Calibrated
DictSample -> Features
DictSample -> Prediction
RawSensor -> Calibrated
Calibrated -> Features
Features -> Prediction
8 — Clean up
import shutil
shutil.rmtree(tmpdir, ignore_errors=True)Key takeaways
| Concept | API |
|---|---|
| Define a forward transform | @atdata.lens decorator on getter function |
| Define a reverse transform | @my_lens.putter decorator |
| Apply lens to a single sample | my_lens.get(source) / my_lens.put(view, source) |
| Transform an entire dataset | ds.as_type(TargetType) |
| Inspect registered lenses | LensNetwork()._registry |
| Verify round-trip correctness | Assert GetPut and PutGet laws |