PhysicsNeMo Datapipes#

datapipe - High-performance GPU-centric data loading for Scientific ML

A modular, composable data pipeline for physics and scientific machine learning. Designed for clean separation of concerns:

  • Readers: Load data from sources → TensorDict tuples with CPU tensors

  • Transforms: Process TensorDict data

  • Dataset: Reader + transforms pipeline with optional auto device transfer

  • DataLoader: Batched iteration with optional prefetching

The PhysicsNeMo Datapipes consists largely of two separate components.

Prior to version 2.0 of PhysicsNeMo, each datapipe was largely independent from all others, targeted for very specific datasets and applications, and broadly not extensible. Those datapipes, preserved in v2.0 for compatibility, are described in the climate, cae, gnn, and benchmark subsections.

In PhysicsNeMo v2.0, the datapipes API has been redesigned from scratch to focus on key factors to enable scientific machine learning training and inference. This document describes the architecture and design philosophy

Refer to the examples of PhysicsNeMo for runnable datapipe tutorials to get started.

Datapipes Philosophy#

The PhysicsNeMo datapipe structure is built on several key design decisions that are specifically made to enable diverse scientific machine learning datasets:

  • GPU First: data preprocessing is done on the GPU, not the CPU.

  • Isolation of roles: reading data is separate from transforming data, which is separate from pipelining data for training, which is separate from threading and stream management. Changing data sources, or preprocessing pipelines, should require no intervention in other areas.

  • Composability and Extensibility: We aim to provide a tool kit and examples that lets you build what you need yourself, easily, if it’s not here.

  • Datapipes as configuration: Changing a pipeline shouldn’t require source code modification; the registry system in PhysicsNeMo datapipes enables hydra instantiation of datapipes at runtime for version-controlled, runtime-configured datapipes. You can also register and instantiate custom components.

Data flows through a PhysicsNeMo datapipe in a consistent path:

  1. A reader will bring the data from storage to CPU memory.

  2. An optional series of one or more transformations will apply on-the-fly manipulations of that data, per instance of data.

  3. Several instances of data will be collated into a batch (customizable, just like in PyTorch).

  4. The batched data is ready for use in a model.

At the highest level, physicsnemo.datapipes.DataLoader has a similar API and model as pytorch.utils.data.DataLoader, which enables a drop-in replacement for many cases. However, PhysicsNeMo has a very different computation orchestration.

Quick Start#

from physicsnemo.datapipes import (
    Dataset,
    DataLoader,
    HDF5Reader,
    Normalize,
    SubsamplePoints,
)

# 1. Choose a Reader for your data format
reader = HDF5Reader(
    "simulation_data.h5",
    fields=["pressure", "velocity", "coordinates"],
)

# 2. Define a transform pipeline
transforms = [
    Normalize(
        input_keys=["pressure"],
        method="mean_std",
        means={"pressure": 101325.0},
        stds={"pressure": 5000.0},
    ),
    SubsamplePoints(
        input_keys=["coordinates", "pressure", "velocity"],
        n_points=2048,
    ),
]

# 3. Create a Dataset (Reader + Transforms + device transfer)
dataset = Dataset(reader, transforms=transforms, device="cuda")

# 4. Wrap in a DataLoader for batched iteration
loader = DataLoader(dataset, batch_size=16, shuffle=True)

for batch in loader:
    predictions = model(batch["pressure"], batch["coordinates"])

The best place to see the PhysicsNeMo datapipes in action, get a sense of how they work, and use them, is to start with the examples located in the examples directory.

Architecture#

The diagram below gives a very high level overview of how the physicsnemo datapipe tools interplay.

┌──────────────┐       ┌──────────────────┐       ┌──────────────────────┐
│    Reader    │──────▶│     Dataset      │──────▶│     DataLoader       │
│              │       │                  │       │                      │
│  _load_sample│       │  device transfer │       │  batches indices     │
│  __len__     │       │  + transforms    │       │  + collation         │
│              │       │  (via Compose)   │       │  + stream prefetch   │
│  Returns:    │       │                  │       │                      │
│  (TensorDict,│       │  Returns:        │       │  Yields:             │
│   metadata)  │       │  (TensorDict,    │       │  batched TensorDict  │
│  on CPU      │       │   metadata)      │       │                      │
└──────────────┘       └──────────────────┘       └──────────────────────┘
                                │
                       ┌────────┴────────┐
                       │   Transform(s)  │
                       │                 │
                       │  Compose chains │
                       │  multiple into  │
                       │  a pipeline     │
                       └─────────────────┘

Core API#

DataLoader#

The DataLoader is meant, in most ways, to be a nearly drop-in replacement to the PyTorch dataloader. A notable difference is the movement of pin_memory from the DataLoader class to the Reader classes. This is because of the much earlier GPU data transfer in the PhysicsNeMo datapipe compared to PyTorch.

class physicsnemo.datapipes.dataloader.DataLoader(
dataset: DatasetBase,
*,
batch_size: int = 1,
shuffle: bool = False,
sampler: Sampler | None = None,
drop_last: bool = False,
collate_fn: Collator | Callable[[Sequence[tuple[TensorDict, dict[str, Any]]]], tuple[TensorDict, list[dict[str, Any]]]] | None = None,
collate_metadata: bool = False,
prefetch_factor: int = 2,
num_streams: int = 4,
use_streams: bool = True,
seed: int | None = None,
)[source]#

Bases: object

Batched iteration over a Dataset with stream-based prefetching.

Unlike PyTorch’s DataLoader which uses CPU multiprocessing, this DataLoader uses CUDA streams to overlap data loading, preprocessing, and collation. This is more efficient for SciML workloads where:

  • Datasets are huge

  • Batches are small

  • Preprocessing benefits from GPU acceleration

Features:

  • Stream-based parallelism (one stream per sample in flight)

  • Toggleable prefetching for debugging

  • Compatible with PyTorch samplers (DistributedSampler, etc.)

  • Familiar torch DataLoader interface

Examples

>>> from physicsnemo.datapipes import DataLoader, Dataset, HDF5Reader, Normalize
>>>
>>> dataset = Dataset(
...     HDF5Reader("data.h5", fields=["input", "target"]),
...     transforms=Normalize(["input"], method="mean_std", means={"input": 0.0}, stds={"input": 1.0}),
...     device="cuda",  # Automatic GPU transfer
... )
>>> loader = DataLoader(dataset, batch_size=16, shuffle=True)
>>>
>>> for batch in loader:
...     output = model(batch["input"])

With DistributedSampler:

>>> from torch.utils.data.distributed import DistributedSampler
>>> sampler = DistributedSampler(dataset)
>>> loader = DataLoader(dataset, batch_size=16, sampler=sampler)
disable_prefetch() None[source]#

Disable prefetching (useful for debugging).

enable_prefetch() None[source]#

Enable stream-based prefetching.

Raises:

RuntimeError – If CUDA is not available.

set_epoch(epoch: int) None[source]#

Set the epoch for the sampler and the full data pipeline.

Propagates the epoch to the sampler (for DistributedSampler), the dataset, reader, and every stochastic transform so all RNG streams are reseeded deterministically.

Parameters:

epoch (int) – Current epoch number.

Dataset#

The Dataset is the core IO + Transformation coordinator of the datapipe infrastructor. Whereas the DataLoader will orchestrate the pipeline, the Dataset is responsible for the threaded execution of Reader``s and ``Transform pipelines to execute it.

class physicsnemo.datapipes.dataset.Dataset(
reader: Reader,
*,
transforms: Transform | Sequence[Transform] | None = None,
device: str | device | None = None,
num_workers: int = 2,
)[source]#

Bases: DatasetBase

A dataset combining a Reader with a transform pipeline.

The Dataset provides a torch-like interface for accessing data:

  • Indexing: dataset[i] returns transformed sample i

  • Iteration: for sample in dataset

  • Length: len(dataset)

  • Prefetching: dataset.prefetch(i, stream) for async loading

The pipeline is: Reader → Transforms → Sample

Prefetching Model#

The dataset supports prefetching samples using a thread pool. When a CUDA stream is provided, GPU operations (device transfer, GPU transforms) happen on that stream, allowing overlap with other computation.

>>> # Start prefetching
>>> dataset.prefetch(0, stream=stream0)
>>> dataset.prefetch(1, stream=stream1)
>>>
>>> # Retrieve results (waits if not ready)
>>> sample_0 = dataset[0]  # Uses prefetched result

Examples

>>> from physicsnemo.datapipes import Dataset, HDF5Reader, Normalize
>>>
>>> reader = HDF5Reader("data.h5", fields=["pressure", "velocity"])
>>> transforms = Normalize(
...     ["pressure"],
...     method="mean_std",
...     means={"pressure": 0.0},
...     stds={"pressure": 1.0},
... )
>>>
>>> dataset = Dataset(reader, transforms=transforms, device="cuda")
>>> sample, metadata = dataset[0]
close() None[source]#

Close the dataset and stop prefetching.

Waits for any in-flight prefetch tasks to complete before shutdown. This prevents “cannot schedule new futures after shutdown” errors from libraries like zarr that use async I/O internally.

property field_names: list[str]#

List of field names in samples (from reader).

Returns:

Field names available in samples.

Return type:

list[str]

prefetch(
index: int,
stream: Stream | None = None,
) None[source]#

Start prefetching a sample asynchronously.

When a CUDA stream is provided, GPU operations (device transfer and transforms) run on that stream for overlap with computation.

Parameters:
  • index (int) – Sample index to prefetch.

  • stream (torch.cuda.Stream, optional) – Optional CUDA stream for GPU operations.

set_epoch(epoch: int) None[source]#

Propagate epoch to the reader and every transform.

Reseeds all generators assigned via set_generator() so each epoch produces a different but deterministic random sequence.

Parameters:

epoch (int) – Current epoch number.

set_generator(generator: Generator) None[source]#

Distribute forked generators to the reader and every stochastic transform.

Forks generator into 1 + len(flat_transforms) independent children: the first goes to the reader, the rest map 1-to-1 to the transform list (deterministic transforms silently ignore theirs).

Parameters:

generator (torch.Generator) – Parent generator (typically forked from the DataLoader’s master generator).

MultiDataset#

The MultiDataset includes two or more Dataset instances behind a single index space (concatenation). Each sub-dataset can have its own Reader and transforms. Global indices are mapped to the owning sub-dataset and local index; metadata is enriched with dataset_index so that batches can identify the source. Use MultiDataset when you want to train on multiple datasets with the same DataLoader, and, optionally, enforce all outputs to share the same TensorDict keys for collation. Refer to physicsnemo.datapipes.multi_dataset.DATASET_INDEX_METADATA_KEY for the metadata key added to each sample.

To properly collate and stack outputs from different datasets, you can set output_strict=True in the constructor of a MultiDataset. After construction, it will load the first batch from every passed dataset and test that the TensorDict produced by the Reader and Transform pipeline has consistent keys. Because the exact collation details differ by dataset, the MultiDataset does not check more aggressively than output key consistency.

class physicsnemo.datapipes.multi_dataset.MultiDataset(
*datasets: DatasetBase,
output_strict: bool = True,
)[source]#

Bases: object

A dataset that composes multiple DatasetBase instances behind one index space.

Accepts both Dataset (TensorDict pipelines) and MeshDataset (Mesh pipelines) as sub-datasets. Global indices are mapped to (dataset_index, local_index) by concatenation: indices 0..len0-1 come from the first dataset, len0..len0+len1-1 from the second, and so on. Each constituent can have its own Reader and transforms. Metadata is enriched with dataset_index so batches can identify the source.

Parameters:
  • *datasets (DatasetBase) – One or more Dataset or MeshDataset instances passed as positional arguments (Reader + transforms each). Order defines index mapping: first dataset occupies 0..len(ds0)-1, etc.

  • output_strict (bool, default=True) – If True, require all datasets to produce the same TensorDict keys (output keys after transforms) so DefaultCollator can stack batches. If False, no check is done; use a custom collator when keys or shapes differ. Note that output_strict=True will load the first instance of all datasets upon construction. Think of it as a debugging parameter: if you are sure that your datasets are working properly, and want to defer loading, you can safely disable this.

Raises:

ValueError – If no datasets are provided or if output_strict=True and output keys differ.

Notes

MultiDataset implements the same interface as Dataset (__len__, __getitem__, prefetch, cancel_prefetch, close, field_names) and can be passed to DataLoader in place of a single dataset. Prefetch and close are delegated to the sub-dataset that owns the index. When output_strict=True, validation checks that each dataset’s output TensorDict (after transforms) has the same keys, not the reader’s field_names. When output_strict=False, field_names returns the first dataset’s field names; with heterogeneous datasets, prefer a custom collator and use metadata dataset_index to group or pad by source.

Shuffling and sampling#

The DataLoader sees one linear index space of size \(\\sum_k \\text{len}(\\text{datasets}[k])\). With shuffle=True, the default RandomSampler shuffles these global indices, so each batch is a random mix of samples from all sub-datasets. There is no per-dataset balancing: if one dataset is much larger, its samples will appear more often. For balanced or stratified sampling, use a custom torch.utils.data.Sampler (e.g. weighted or one sample per dataset per batch) and pass it to the DataLoader.

Metadata#

Every sample returned by __getitem__() has its metadata dict extended with the key DATASET_INDEX_METADATA_KEY ("dataset_index"), the integer index of the sub-dataset that produced the sample (0 for the first dataset, 1 for the second, etc.). Sub-dataset–specific metadata (e.g. file path, index within that dataset) is unchanged. When using the DataLoader with collate_metadata=True, each batch yields a list of metadata dicts aligned with the batch dimension; each dict includes dataset_index so you can filter, weight, or aggregate by source in the training loop.

Examples

>>> from physicsnemo.datapipes import Dataset, MultiDataset, HDF5Reader, Normalize
>>> ds_a = Dataset(HDF5Reader("a.h5", fields=["x", "y"]), transforms=None)
>>> ds_b = Dataset(HDF5Reader("b.h5", fields=["x", "y"]), transforms=None)
>>> multi = MultiDataset(ds_a, ds_b, output_strict=True)
>>> len(multi) == len(ds_a) + len(ds_b)
True
>>> data, meta = multi[0]   # from ds_a
>>> meta["dataset_index"]   # 0
cancel_prefetch(index: int | None = None) None[source]#

Cancel prefetch for the given index or all sub-datasets.

When index is provided, only cancels if it is in range; out-of-range indices are ignored to match Dataset behavior.

Parameters:

index (int, optional) – Global index to cancel, or None to cancel all.

close() None[source]#

Close all sub-datasets and release resources.

property field_names: list[str]#

Field names in samples.

With output_strict=True, returns the common output keys (TensorDict keys after transforms). With output_strict=False, returns the first dataset’s field names.

prefetch(
index: int,
stream: Any | None = None,
) None[source]#

Start prefetching the sample at the given global index.

Delegates to the sub-dataset that owns that index.

Parameters:
  • index (int) – Global sample index to prefetch.

  • stream (object, optional) – Optional CUDA stream for the sub-dataset prefetch.

set_epoch(epoch: int) None[source]#

Propagate epoch to every sub-dataset.

Parameters:

epoch (int) – Current epoch number.

set_generator(generator: Generator) None[source]#

Fork generator and distribute one child per sub-dataset.

Parameters:

generator (torch.Generator) – Parent generator (typically forked from the DataLoader’s master generator).

Readers#

Readers are the data-ingestion layer. Each one loads individual samples from a specific storage format (HDF5, Zarr, NumPy, VTK) and returns CPU tensors in a uniform dict interface. Refer to Built-in Readers for the base class API and all built-in readers.

Transforms#

Transforms are composable, device-agnostic operations applied to each sample after it is loaded and transferred to the target device. The Compose container chains multiple transforms into a single callable. Refer to Built-in Transforms for the base class API, Compose, and all built-in transforms.

Collation#

Combining a set of TensorDict objects into a batch of data can, at times, require special care. For example, collating graph datasets for Graph Neural Networks requires different merging of batches than concatenation along a batch dimension. For this reason, PhysicsNeMo datapipes offers custom collation functions as well as an interface to write your own collator. If the dataset you are trying to collate can not be accommodated here, open an issue on github.

For an example of a custom collation function that produces a batch of PyG graph data, refer to the examples on github for the datapipes.

class physicsnemo.datapipes.collate.Collator[source]#

Bases: ABC

Abstract base class for collators.

Collators take a sequence of (TensorDict, dict) tuples and combine them into a batched output. By default, returns just the batched TensorDict for PyTorch DataLoader compatibility. When collate_metadata=True, returns a tuple of (TensorDict, list[dict]).

Examples

>>> class MyCollator(Collator):
...     def __call__(
...         self,
...         samples: Sequence[tuple[TensorDict, dict]]
...     ) -> TensorDict:
...         # Custom batching logic
...         ...
class physicsnemo.datapipes.collate.DefaultCollator(
*,
stack_dim: int = 0,
keys: list[str] | None = None,
collate_metadata: bool = False,
)[source]#

Bases: Collator

Default collator that stacks TensorDicts along a new batch dimension.

Uses TensorDict.stack() to efficiently batch all tensors, creating shape [batch_size, …original_shape] for each field.

All samples must have:

  • The same tensor keys

  • Tensors with matching shapes (per key)

  • Tensors on the same device

By default, returns just the batched TensorDict for PyTorch DataLoader compatibility. Set collate_metadata=True to also return metadata.

Examples

>>> data1 = TensorDict({"x": torch.randn(10, 3)}, device="cpu")
>>> data2 = TensorDict({"x": torch.randn(10, 3)}, device="cpu")
>>> samples = [
...     (data1, {"file": "a.h5"}),
...     (data2, {"file": "b.h5"}),
... ]
>>> collator = DefaultCollator()
>>> batched_data = collator(samples)
>>> batched_data["x"].shape
torch.Size([2, 10, 3])

With metadata collation enabled:

>>> collator = DefaultCollator(collate_metadata=True)
>>> batched_data, metadata_list = collator(samples)
>>> metadata_list
[{'file': 'a.h5'}, {'file': 'b.h5'}]
class physicsnemo.datapipes.collate.ConcatCollator(
*,
dim: int = 0,
add_batch_idx: bool = True,
batch_idx_key: str = 'batch_idx',
keys: list[str] | None = None,
collate_metadata: bool = False,
)[source]#

Bases: Collator

Collator that concatenates tensors along an existing dimension.

Unlike DefaultCollator which creates a new batch dimension, this concatenates along an existing dimension. Useful for point clouds or other variable-length data where you want to combine all points.

Optionally adds batch indices to track which points came from which sample. By default, returns just the batched TensorDict for PyTorch DataLoader compatibility. Set collate_metadata=True to also return metadata.

Examples

>>> data1 = TensorDict({"points": torch.randn(100, 3)})
>>> data2 = TensorDict({"points": torch.randn(150, 3)})
>>> samples = [
...     (data1, {"file": "a.h5"}),
...     (data2, {"file": "b.h5"}),
... ]
>>> collator = ConcatCollator(dim=0, add_batch_idx=True)
>>> batched_data = collator(samples)
>>> batched_data["points"].shape
torch.Size([250, 3])
>>> batched_data["batch_idx"].shape
torch.Size([250])

With metadata collation enabled:

>>> collator = ConcatCollator(dim=0, add_batch_idx=True, collate_metadata=True)
>>> batched_data, metadata_list = collator(samples)
>>> metadata_list
[{'file': 'a.h5'}, {'file': 'b.h5'}]
class physicsnemo.datapipes.collate.FunctionCollator(
fn: Callable[[Sequence[tuple[TensorDict, dict[str, Any]]]], tuple[TensorDict, list[dict[str, Any]]]],
)[source]#

Bases: Collator

Collator that wraps a user-provided function.

Allows using any function as a collator without subclassing.

Examples

>>> def my_collate(samples):
...     # Custom logic
...     data_list = [d for d, _ in samples]
...     metadata_list = [m for _, m in samples]
...     return torch.stack(data_list), metadata_list
>>> collator = FunctionCollator(my_collate)

Extending the Pipeline#

Custom Reader example:

import torch
from physicsnemo.datapipes import Reader

class CSVReader(Reader):
    def __init__(self, path, **kwargs):
        super().__init__(**kwargs)
        import pandas as pd
        self.df = pd.read_csv(path)

    def _load_sample(self, index: int) -> dict[str, torch.Tensor]:
        row = self.df.iloc[index]
        return {
            "features": torch.tensor(row[:-1].values, dtype=torch.float32),
            "target": torch.tensor([row.iloc[-1]], dtype=torch.float32),
        }

    def __len__(self) -> int:
        return len(self.df)

Custom Transform example:

from tensordict import TensorDict
from physicsnemo.datapipes import Transform

class LogScale(Transform):
    def __init__(self, keys: list[str], epsilon: float = 1e-8):
        super().__init__()
        self.keys = keys
        self.epsilon = epsilon

    def __call__(self, data: TensorDict) -> TensorDict:
        for key in self.keys:
            data[key] = torch.log(data[key] + self.epsilon)
        return data

Built-in Readers & Transforms

Legacy Datapipes#

The following datapipe modules predate the v2.0 redesign and are preserved for backward compatibility. They are domain-specific, self-contained pipelines originally written for particular datasets and workflows.