PhysicsNeMo Datapipes#
datapipe - High-performance GPU-centric data loading for Scientific ML
A modular, composable data pipeline for physics and scientific machine learning. Designed for clean separation of concerns:
Readers: Load data from sources → TensorDict tuples with CPU tensors
Transforms: Process TensorDict data
Dataset: Reader + transforms pipeline with optional auto device transfer
DataLoader: Batched iteration with optional prefetching
The PhysicsNeMo Datapipes consists largely of two separate components, both described here. Prior to version 2.0 of PhysicsNeMo, each datapipe was largely independent from all others, targeted for very specific datasets and applications, and broadly not extensible. Those datapipes, preserved in v2.0 for compatibility, are described in the climate, cae, gnn, and benchmark subsections.
In PhysicsNeMo v2.0, the datapipes API has been redesigned from scratch to focus on key factors to enable scientific machine learning training and inference. These documentation pages describe the architecture and design philosophy, while in the examples of PhysicsNeMo there are runnable datapipe tutorials for getting started.
Datapipes philosophy#
The PhysicsNeMo datapipe structure is built on several key design decisions that are specifically made to enable diverse scientific machine learning datasets:
GPU First: data preprocessing is done on the GPU, not the CPU.
Isolation of roles: reading data is separate from transforming data, which is separate from pipelining data for training, which is separate from threading and stream management, etc. Changing data sources, or preprocessing pipelines, should require no intervention in other areas.
Composability and Extensibility: We aim to provide a tool kit and examples that lets you build what you need yourself, easily, if it’s not here.
Datapipes as configuration: Changing a pipeline shouldn’t require source code modification; the registry system in PhysicsNeMo datapipes enables hydra instantiation of datapipes at runtime for version-controlled, runtime-configured datapipes. You can register and instantiate custom components, of course.
Data flows through a PhysicsNeMo datapipe in a consistent path:
A
readerwill bring the data from storage to CPU memoryAn optional series of one or more transformations will apply on-the-fly manipulations of that data, per instance of data.
Several instances of data will be collated into a batch (customizable, just like in PyTorch).
The batched data is ready for use in a model.
At the highest level, physicsnemo.datapipes.DataLoader has a similar API and
model as pytorch.utils.data.DataLoader, enabling a drop-in replacement in many
cases. Under the hood, physicsnemo follows a very different computation orchestration.
Quick Start#
from physicsnemo.datapipes import (
Dataset,
DataLoader,
HDF5Reader,
Normalize,
SubsamplePoints,
)
# 1. Choose a Reader for your data format
reader = HDF5Reader(
"simulation_data.h5",
fields=["pressure", "velocity", "coordinates"],
)
# 2. Define a transform pipeline
transforms = [
Normalize(
input_keys=["pressure"],
method="mean_std",
means={"pressure": 101325.0},
stds={"pressure": 5000.0},
),
SubsamplePoints(
input_keys=["coordinates", "pressure", "velocity"],
n_points=2048,
),
]
# 3. Create a Dataset (Reader + Transforms + device transfer)
dataset = Dataset(reader, transforms=transforms, device="cuda")
# 4. Wrap in a DataLoader for batched iteration
loader = DataLoader(dataset, batch_size=16, shuffle=True)
for batch in loader:
predictions = model(batch["pressure"], batch["coordinates"])
The best place to see the PhysicsNeMo datapipes in action, and get a sense of how they work and use them, is to start with the examples located in the examples directory.
Architecture#
The diagram below gives a very high level overview of how the physicsnemo
datapipe tools interplay.
┌──────────────┐ ┌──────────────────┐ ┌──────────────────────┐
│ Reader │──────▶│ Dataset │──────▶│ DataLoader │
│ │ │ │ │ │
│ _load_sample│ │ device transfer │ │ batches indices │
│ __len__ │ │ + transforms │ │ + collation │
│ │ │ (via Compose) │ │ + stream prefetch │
│ Returns: │ │ │ │ │
│ (TensorDict,│ │ Returns: │ │ Yields: │
│ metadata) │ │ (TensorDict, │ │ batched TensorDict │
│ on CPU │ │ metadata) │ │ │
└──────────────┘ └──────────────────┘ └──────────────────────┘
│
┌────────┴────────┐
│ Transform(s) │
│ │
│ Compose chains │
│ multiple into │
│ a pipeline │
└─────────────────┘
Core API#
DataLoader#
The DataLoader is meant, in most ways, to be a nearly drop-in
replacement to the PyTorch dataloader. A notable difference is the movement
of pin_memory from the DataLoader class to the Reader classes.
This is because of the much earlier GPU data transfer in the PhysicsNeMo
datapipe compared to PyTorch.
- class physicsnemo.datapipes.dataloader.DataLoader(
- dataset: Dataset,
- *,
- batch_size: int = 1,
- shuffle: bool = False,
- sampler: Sampler | None = None,
- drop_last: bool = False,
- collate_fn: Collator | Callable[[Sequence[tuple[TensorDict, dict[str, Any]]]], tuple[TensorDict, list[dict[str, Any]]]] | None = None,
- collate_metadata: bool = False,
- prefetch_factor: int = 2,
- num_streams: int = 4,
- use_streams: bool = True,
Bases:
objectBatched iteration over a Dataset with stream-based prefetching.
Unlike PyTorch’s DataLoader which uses CPU multiprocessing, this DataLoader uses CUDA streams to overlap data loading, preprocessing, and collation. This is more efficient for SciML workloads where:
Datasets are huge
Batches are small
Preprocessing benefits from GPU acceleration
Features:
Stream-based parallelism (one stream per sample in flight)
Toggleable prefetching for debugging
Compatible with PyTorch samplers (DistributedSampler, etc.)
Familiar torch DataLoader interface
Examples
>>> from physicsnemo.datapipes import DataLoader, Dataset, HDF5Reader, Normalize >>> >>> dataset = Dataset( ... HDF5Reader("data.h5", fields=["input", "target"]), ... transforms=Normalize(["input"], method="mean_std", means={"input": 0.0}, stds={"input": 1.0}), ... device="cuda", # Automatic GPU transfer ... ) >>> loader = DataLoader(dataset, batch_size=16, shuffle=True) >>> >>> for batch in loader: ... output = model(batch["input"])
With DistributedSampler:
>>> from torch.utils.data.distributed import DistributedSampler >>> sampler = DistributedSampler(dataset) >>> loader = DataLoader(dataset, batch_size=16, sampler=sampler)
Dataset#
The Dataset is the core IO + Transformation coordinator of the datapipe
infrastructor. Whereas the DataLoader will orchestrate the pipeline,
the Dataset is responsible for the threaded execution of Reader``s and
``Transform pipelines to execute it.
- class physicsnemo.datapipes.dataset.Dataset(
- reader: Reader,
- *,
- transforms: Transform | Sequence[Transform] | None = None,
- device: str | device | None = None,
- num_workers: int = 2,
Bases:
objectA dataset combining a Reader with a transform pipeline.
The Dataset provides a torch-like interface for accessing data:
Indexing: dataset[i] returns transformed sample i
Iteration: for sample in dataset
Length: len(dataset)
Prefetching: dataset.prefetch(i, stream) for async loading
The pipeline is: Reader → Transforms → Sample
Prefetching Model#
The dataset supports prefetching samples using a thread pool. When a CUDA stream is provided, GPU operations (device transfer, GPU transforms) happen on that stream, allowing overlap with other computation.
>>> # Start prefetching >>> dataset.prefetch(0, stream=stream0) >>> dataset.prefetch(1, stream=stream1) >>> >>> # Retrieve results (waits if not ready) >>> sample_0 = dataset[0] # Uses prefetched result
Examples
>>> from physicsnemo.datapipes import Dataset, HDF5Reader, Normalize >>> >>> reader = HDF5Reader("data.h5", fields=["pressure", "velocity"]) >>> transforms = Normalize( ... ["pressure"], ... method="mean_std", ... means={"pressure": 0.0}, ... stds={"pressure": 1.0}, ... ) >>> >>> dataset = Dataset(reader, transforms=transforms, device="cuda") >>> sample, metadata = dataset[0]
- cancel_prefetch(index: int | None = None) None[source]#
Cancel prefetch requests.
Note: Already-running tasks will complete, but results are discarded.
- Parameters:
index (int, optional) – Specific index to cancel. If None, cancels all.
- close() None[source]#
Close the dataset and stop prefetching.
Waits for any in-flight prefetch tasks to complete before shutdown. This prevents “cannot schedule new futures after shutdown” errors from libraries like zarr that use async I/O internally.
- property field_names: list[str]#
List of field names in samples (from reader).
- Returns:
Field names available in samples.
- Return type:
list[str]
- prefetch(
- index: int,
- stream: Stream | None = None,
Start prefetching a sample asynchronously.
The sample will be loaded in a background thread. If a CUDA stream is provided, GPU operations happen on that stream.
Call __getitem__ to retrieve the result (it will wait if needed).
- Parameters:
index (int) – Sample index to prefetch.
stream (torch.cuda.Stream, optional) – Optional CUDA stream for GPU operations.
- prefetch_batch(
- indices: Sequence[int],
- streams: Sequence[Stream] | None = None,
Start prefetching multiple samples.
- Parameters:
indices (Sequence[int]) – Sample indices to prefetch.
streams (Sequence[torch.cuda.Stream], optional) – Optional CUDA streams, one per index. If shorter than indices, streams are cycled. If None, no streams used.
- property prefetch_count: int#
Number of items currently being prefetched.
- Returns:
Count of in-flight prefetch operations.
- Return type:
int
Readers#
Readers are the data-ingestion layer: each one loads individual samples from a specific storage format (HDF5, Zarr, NumPy, VTK, etc.) and returns CPU tensors in a uniform dict interface. See Built-in Readers for the base class API and all built-in readers.
Transforms#
Transforms are composable, device-agnostic operations applied to each sample
after it is loaded and transferred to the target device. The Compose
container chains multiple transforms into a single callable. See
Built-in Transforms for the base class API, Compose,
and all built-in transforms.
Collation#
Combining a set of tensordict objects into a batch of data can, at times, Some datasets, like graph datasets, require special care. For this reason, PhysicsNeMo datapipes offers custom collation functions as well as an interface to write your own collator. If the dataset you are trying to collate can not be accommodated here, please open an issue on github.
For an example of a custom collation function to produce a batch of PyG graph data, see the examples on github for the datapipes.
- class physicsnemo.datapipes.collate.Collator[source]#
Bases:
ABCAbstract base class for collators.
Collators take a sequence of (TensorDict, dict) tuples and combine them into a batched output. By default, returns just the batched TensorDict for PyTorch DataLoader compatibility. When collate_metadata=True, returns a tuple of (TensorDict, list[dict]).
Examples
>>> class MyCollator(Collator): ... def __call__( ... self, ... samples: Sequence[tuple[TensorDict, dict]] ... ) -> TensorDict: ... # Custom batching logic ... ...
- class physicsnemo.datapipes.collate.DefaultCollator(
- *,
- stack_dim: int = 0,
- keys: list[str] | None = None,
- collate_metadata: bool = False,
Bases:
CollatorDefault collator that stacks TensorDicts along a new batch dimension.
Uses TensorDict.stack() to efficiently batch all tensors, creating shape [batch_size, …original_shape] for each field.
All samples must have:
The same tensor keys
Tensors with matching shapes (per key)
Tensors on the same device
By default, returns just the batched TensorDict for PyTorch DataLoader compatibility. Set collate_metadata=True to also return metadata.
Examples
>>> data1 = TensorDict({"x": torch.randn(10, 3)}, device="cpu") >>> data2 = TensorDict({"x": torch.randn(10, 3)}, device="cpu") >>> samples = [ ... (data1, {"file": "a.h5"}), ... (data2, {"file": "b.h5"}), ... ] >>> collator = DefaultCollator() >>> batched_data = collator(samples) >>> batched_data["x"].shape torch.Size([2, 10, 3])
With metadata collation enabled:
>>> collator = DefaultCollator(collate_metadata=True) >>> batched_data, metadata_list = collator(samples) >>> metadata_list [{'file': 'a.h5'}, {'file': 'b.h5'}]
- class physicsnemo.datapipes.collate.ConcatCollator(
- *,
- dim: int = 0,
- add_batch_idx: bool = True,
- batch_idx_key: str = 'batch_idx',
- keys: list[str] | None = None,
- collate_metadata: bool = False,
Bases:
CollatorCollator that concatenates tensors along an existing dimension.
Unlike DefaultCollator which creates a new batch dimension, this concatenates along an existing dimension. Useful for point clouds or other variable-length data where you want to combine all points.
Optionally adds batch indices to track which points came from which sample. By default, returns just the batched TensorDict for PyTorch DataLoader compatibility. Set collate_metadata=True to also return metadata.
Examples
>>> data1 = TensorDict({"points": torch.randn(100, 3)}) >>> data2 = TensorDict({"points": torch.randn(150, 3)}) >>> samples = [ ... (data1, {"file": "a.h5"}), ... (data2, {"file": "b.h5"}), ... ] >>> collator = ConcatCollator(dim=0, add_batch_idx=True) >>> batched_data = collator(samples) >>> batched_data["points"].shape torch.Size([250, 3]) >>> batched_data["batch_idx"].shape torch.Size([250])
With metadata collation enabled:
>>> collator = ConcatCollator(dim=0, add_batch_idx=True, collate_metadata=True) >>> batched_data, metadata_list = collator(samples) >>> metadata_list [{'file': 'a.h5'}, {'file': 'b.h5'}]
- class physicsnemo.datapipes.collate.FunctionCollator(
- fn: Callable[[Sequence[tuple[TensorDict, dict[str, Any]]]], tuple[TensorDict, list[dict[str, Any]]]],
Bases:
CollatorCollator that wraps a user-provided function.
Allows using any function as a collator without subclassing.
Examples
>>> def my_collate(samples): ... # Custom logic ... data_list = [d for d, _ in samples] ... metadata_list = [m for _, m in samples] ... return torch.stack(data_list), metadata_list >>> collator = FunctionCollator(my_collate)
Extending the Pipeline#
Custom Reader example:
import torch
from physicsnemo.datapipes import Reader
class CSVReader(Reader):
def __init__(self, path, **kwargs):
super().__init__(**kwargs)
import pandas as pd
self.df = pd.read_csv(path)
def _load_sample(self, index: int) -> dict[str, torch.Tensor]:
row = self.df.iloc[index]
return {
"features": torch.tensor(row[:-1].values, dtype=torch.float32),
"target": torch.tensor([row.iloc[-1]], dtype=torch.float32),
}
def __len__(self) -> int:
return len(self.df)
Custom Transform example:
from tensordict import TensorDict
from physicsnemo.datapipes import Transform
class LogScale(Transform):
def __init__(self, keys: list[str], epsilon: float = 1e-8):
super().__init__()
self.keys = keys
self.epsilon = epsilon
def __call__(self, data: TensorDict) -> TensorDict:
for key in self.keys:
data[key] = torch.log(data[key] + self.epsilon)
return data
Built-in Readers & Transforms
Legacy Datapipes#
The following datapipe modules predate the v2.0 redesign and are preserved for backward compatibility. They are domain-specific, self-contained pipelines originally written for particular datasets and workflows.