PhysicsNeMo Datapipes#

PhysicsNeMo Datapipes provides a collection of data loading and processing utilities designed to handle various types of physics-based datasets. The datapipes are organized into several categories to support different types of physics simulations and machine learning tasks.

The datapipes in PhysicsNeMo are built on top of PhysicsNeMo’s DataPipe class and provide specialized implementations for handling physics simulation data, climate data, graph-based data, and mesh-based data. Each category of datapipes is designed to efficiently load and preprocess specific types of physics datasets.

The example below shows how to use a typical datapipe in PhysicsNeMo:

import torch
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D

def main():
    # Create a datapipe for Darcy flow simulation data
    datapipe = Darcy2D(
        batch_size=32,
        device="cuda" if torch.cuda.is_available() else "cpu"
    )

    # Iterate through the datapipe
    for batch in datapipe:
        # batch contains input features and target values
        input_features = batch["permeability"]
        target_values = batch["darcy"]

        # Use the data for training or inference
        ...

if __name__ == "__main__":
    main()

Here’s another example showing how to use the ERA5HDF5Datapipe for weather data processing:

import torch
from physicsnemo.datapipes.climate.era5_hdf5 import ERA5HDF5Datapipe

def main():
    # Create a datapipe for ERA5 weather data in HDF5 format
    datapipe = ERA5HDF5Datapipe(
        data_dir="path/to/era5/data",
        stats_dir="path/to/era5/stats",
        channels=[0, 1],
        latlon_resolution=(721, 1440),
        shuffle=True,
    )

    # Iterate through the datapipe
    for batch in datapipe:
        invar = batch[0]["invar"]
        outvar = batch[0]["outvar"]

        # Use the data for weather prediction or analysis
        ...

if __name__ == "__main__":
    main()

Available Datapipes#

PhysicsNeMo provides several categories of datapipes:

  1. Benchmark Datapipes - Designed for standard physics benchmark problems - Include implementations for Darcy flow and Kelvin-Helmholtz instability

  2. Weather and Climate Datapipes - Specialized for handling climate and weather data - Support ERA5 HDF5 format and synthetic climate data - Include utilities for HEALPix time series data

  3. Graph Datapipes - Handle graph-based physics data - Support vortex shedding, Ahmed body, DrivaerNet, and Stokes flow datasets - Include utility functions for graph data processing

  4. CAE (Computer-Aided Engineering) Datapipes - Specialized for mesh-based data - Support various mesh formats and configurations

Each category of datapipes is designed to handle specific data formats and preprocessing requirements. The datapipes automatically handle data loading, preprocessing, and device placement, making it easy to integrate them into training or inference pipelines.

Benchmark datapipes#

class physicsnemo.datapipes.benchmarks.darcy.Darcy2D(
resolution: int = 256,
batch_size: int = 64,
nr_permeability_freq: int = 5,
max_permeability: float = 2.0,
min_permeability: float = 0.5,
max_iterations: int = 30000,
convergence_threshold: float = 1e-06,
iterations_per_convergence_check: int = 1000,
nr_multigrids: int = 4,
normaliser: Dict[str, Tuple[float, float]] | None = None,
device: str | device = 'cuda',
)[source]#

Bases: Datapipe

2D Darcy flow benchmark problem datapipe.

This datapipe continuously generates solutions to the 2D Darcy equation with variable permeability. All samples are generated on the fly and is meant to be a benchmark problem for testing data driven models. Permeability is drawn from a random Fourier series and threshold it to give a piecewise constant function. The solution is obtained using a GPU enabled multi-grid Jacobi iterative method.

Parameters:
  • resolution (int, optional) – Resolution to run simulation at, by default 256

  • batch_size (int, optional) – Batch size of simulations, by default 64

  • nr_permeability_freq (int, optional) – Number of frequencies to use for generating random permeability. Higher values will give higher freq permeability fields., by default 5

  • max_permeability (float, optional) – Max permeability, by default 2.0

  • min_permeability (float, optional) – Min permeability, by default 0.5

  • max_iterations (int, optional) – Maximum iterations to use for each multi-grid, by default 30000

  • convergence_threshold (float, optional) – Solver L-Infinity convergence threshold, by default 1e-6

  • iterations_per_convergence_check (int, optional) – Number of Jacobi iterations to run before checking convergence, by default 1000

  • nr_multigrids (int, optional) – Number of multi-grid levels, by default 4

  • normaliser (Union[Dict[str, Tuple[float, float]], None], optional) – Dictionary with keys permeability and darcy. The values for these keys are two floats corresponding to mean and std (mean, std).

  • device (Union[str, torch.device], optional) – Device for datapipe to run place data on, by default “cuda”

Raises:

ValueError – Incompatable multi-grid and resolution settings

generate_batch() None[source]#

Solve for new batch of simulations

initialize_batch() None[source]#

Initializes arrays for new batch of simulations

class physicsnemo.datapipes.benchmarks.darcy.MetaData(
name: str = 'Darcy2D',
auto_device: bool = True,
cuda_graphs: bool = True,
ddp_sharding: bool = False,
)[source]#

Bases: DatapipeMetaData

The Darcy2D provides data loading and preprocessing utilities for 2D Darcy flow simulations. It handles permeability fields and pressure solutions, supporting various boundary conditions and mesh resolutions.

class physicsnemo.datapipes.benchmarks.kelvin_helmholtz.KelvinHelmholtz2D(
resolution: int = 512,
batch_size: int = 16,
seq_length: int = 8,
nr_perturbation_freq: int = 5,
perturbation_range: float = 0.1,
nr_snapshots: int = 256,
iteration_per_snapshot: int = 32,
gamma: float = 1.6666666666666667,
normaliser: Dict[str, Tuple[float, float]] | None = None,
device: str | device = 'cuda',
)[source]#

Bases: Datapipe

Kelvin-Helmholtz instability benchmark problem datapipe.

This datapipe continuously generates samples with random initial conditions. All samples are generated on the fly and is meant to be a benchmark problem for testing data driven models. Initial conditions are given in the form of small perturbations. The solution is obtained using a GPU enabled Finite Volume Method.

Parameters:
  • resolution (int, optional) – Resolution to run simulation at, by default 512

  • batch_size (int, optional) – Batch size of simulations, by default 16

  • seq_length (int, optional) – Sequence length of output samples, by default 8

  • nr_perturbation_freq (int, optional) – Number of frequencies to use for generating random initial perturbations, by default 5

  • perturbation_range (float, optional) – Range to use for random perturbations. This value will be the max amplitude of the initial perturbation, by default 0.1

  • nr_snapshots (int, optional) – Number of snapshots of simulation to generate for data generation. This will control how long the simulation is run for, by default 256

  • iteration_per_snapshot (int, optional) – Number of finite volume steps to take between each snapshot. Each step size is fixed as the smallest possible value that satisfies the Courant-Friedrichs-Lewy condition, by default 32

  • gamma (float, optional) – Heat capacity ratio, by default 5.0/3.0

  • normaliser (Union[Dict[str, Tuple[float, float]], None], optional) – Dictionary with keys density, velocity, and pressure. The values for these keys are two floats corresponding to mean and std (mean, std).

  • device (Union[str, torch.device], optional) – Device for datapipe to run place data on, by default “cuda”

generate_batch() None[source]#

Solve for new batch of simulations

initialize_batch() None[source]#

Initializes arrays for new batch of simulations

class physicsnemo.datapipes.benchmarks.kelvin_helmholtz.MetaData(
name: str = 'KelvinHelmholtz2D',
auto_device: bool = True,
cuda_graphs: bool = True,
ddp_sharding: bool = False,
)[source]#

Bases: DatapipeMetaData

The KelvinHelmholtz2D manages data for Kelvin-Helmholtz instability simulations, including velocity fields and density distributions. It supports both 2D and 3D simulation data with various initial conditions.

Weather and climate datapipes#

class physicsnemo.datapipes.climate.era5_hdf5.ERA5DaliExternalSource(
data_paths: Iterable[str],
num_samples: int,
channels: Iterable[int],
num_steps: int,
num_history: int,
stride: int,
num_samples_per_year: int,
use_cos_zenith: bool,
cos_zenith_args: Dict,
use_time_of_year_index: bool,
batch_size: int = 1,
shuffle: bool = True,
process_rank: int = 0,
world_size: int = 1,
)[source]#

Bases: object

DALI Source for lazy-loading the HDF5 ERA5 files

Parameters:
  • data_paths (Iterable[str]) – Directory where ERA5 data is stored

  • num_samples (int) – Total number of training samples

  • channels (Iterable[int]) – List representing which ERA5 variables to load

  • start_year (int, optional) – Start year of dataset

  • stride (int) – Number of steps between input and output variables

  • num_steps (int) – Number of timesteps are included in the output variables

  • num_history (int) – Number of previous timesteps included in the input variables

  • num_samples_per_year (int) – Number of samples randomly taken from each year

  • batch_size (int, optional) – Batch size, by default 1

  • use_cos_zenith (bool) – If True, the cosine zenith angles corresponding to the coordinates will be produced

  • cos_zenith_args (Dict) –

    Dictionary containing the following:

    dt: float

    Time in hours between each timestep in the dataset

    start_year: int

    Start year of dataset

  • shuffle (bool, optional) – Shuffle dataset, by default True

  • process_rank (int, optional) – Rank ID of local process, by default 0

  • world_size (int, optional) – Number of training processes, by default 1

class physicsnemo.datapipes.climate.era5_hdf5.ERA5HDF5Datapipe(
data_dir: str,
stats_dir: str | None = None,
channels: List[int] | None = None,
batch_size: int = 1,
num_steps: int = 1,
num_history: int = 0,
stride: int = 1,
latlon_resolution: Tuple[int, int] | None = None,
interpolation_type: str | None = None,
patch_size: Tuple[int, int] | int | None = None,
num_samples_per_year: int | None = None,
use_cos_zenith: bool = False,
cos_zenith_args: Dict = {},
use_time_of_year_index: bool = False,
shuffle: bool = True,
num_workers: int = 1,
device: str | device = 'cuda',
process_rank: int = 0,
world_size: int = 1,
)[source]#

Bases: Datapipe

ERA5 DALI data pipeline for HDF5 files

Parameters:
  • data_dir (str) – Directory where ERA5 data is stored

  • stats_dir (Union[str, None], optional) – Directory to data statistic numpy files for normalization, if None, no normalization will be used, by default None

  • channels (Union[List[int], None], optional) – Defines which ERA5 variables to load, if None will use all in HDF5 file, by default None

  • batch_size (int, optional) – Batch size, by default 1

  • stride (int, optional) – Number of steps between input and output variables. For example, if the dataset contains data at every 6 hours, a stride 1 = 6 hour delta t and stride 2 = 12 hours delta t, by default 1

  • num_steps (int, optional) – Number of timesteps are included in the output variables, by default 1

  • num_history (int, optional) – Number of previous timesteps included in the input variables, by default 0

  • latlon_resolution (Tuple[int, int], optional) – The resolution for the latitude-longitude grid (H, W). Needs to be specified for cos zenith angle computation, or interpolation. By default None

  • interpolation_type (str, optional) – Interpolation type for resizing. Supports [“INTERP_NN”, “INTERP_LINEAR”, “INTERP_CUBIC”, “INTERP_LANCZOS3”, “INTERP_TRIANGULAR”, “INTERP_GAUSSIAN”]. By default None (no interpolation is done)

  • patch_size (Union[Tuple[int, int], int, None], optional) – If specified, crops input and output variables so image dimensions are divisible by patch_size, by default None

  • num_samples_per_year (int, optional) – Number of samples randomly taken from each year. If None, all will be used, by default None

  • use_cos_zenith (bool, optional) – If True, the cosine zenith angles corresponding to the coordinates will be produced, by default False

  • cos_zenith_args (Dict, optional) –

    Dictionary containing the following:

    dt: float, optional

    Time in hours between each timestep in the dataset, by default 6 hr

    start_year: int, optional

    Start year of dataset, by default 1980

    latlon_boundsTuple[Tuple[float, float], Tuple[float, float]], optional

    Bounds of latitude and longitude in the data, in the format ((lat_start, lat_end,), (lon_start, lon_end)). By default ((90, -90), (0, 360)).

    Defaults are only applicable if use_cos_zenith is True. Otherwise, defaults to {}.

  • use_time_of_year_index (bool) – If true, also returns the index that can be used to determine the time of the year corresponding to each sample. By default False.

  • shuffle (bool, optional) – Shuffle dataset, by default True

  • num_workers (int, optional) – Number of workers, by default 1

  • device (Union[str, torch.device], optional) – Device for DALI pipeline to run on, by default cuda

  • process_rank (int, optional) – Rank ID of local process, by default 0

  • world_size (int, optional) – Number of training processes, by default 1

load_statistics() None[source]#

Loads ERA5 statistics from pre-computed numpy files

The statistic files should be of name global_means.npy and global_std.npy with a shape of [1, C, 1, 1] located in the stat_dir.

Raises:
  • IOError – If mean or std numpy files are not found

  • AssertionError – If loaded numpy arrays are not of correct size

parse_dataset_files() None[source]#

Parses the data directory for valid HDF5 files and determines training samples

Raises:

ValueError – In channels specified or number of samples per year is not valid

class physicsnemo.datapipes.climate.era5_hdf5.MetaData(
name: str = 'ERA5HDF5',
auto_device: bool = True,
cuda_graphs: bool = True,
ddp_sharding: bool = True,
)[source]#

Bases: DatapipeMetaData

The ERA5HDF5Datapipe handles ERA5 reanalysis data stored in HDF5 format, providing access to atmospheric variables like temperature, pressure, and wind fields at various pressure levels.

class physicsnemo.datapipes.climate.climate.ClimateDaliExternalSource(
data_paths: Iterable[str],
num_samples: int,
channels: Iterable[int],
num_steps: int,
stride: int,
dt: float,
start_year: int,
num_samples_per_year: int,
latlon: ndarray,
variables: List[str] | None = None,
aux_variables: List[str | Callable] = (),
batch_size: int = 1,
shuffle: bool = True,
process_rank: int = 0,
world_size: int = 1,
backend_kwargs: dict | None = None,
)[source]#

Bases: ABC

DALI Source for lazy-loading the HDF5/NetCDF4 climate files

Parameters:
  • data_paths (Iterable[str]) – Directory where climate data is stored

  • num_samples (int) – Total number of training samples

  • channels (Iterable[int]) – List representing which climate variables to load

  • num_steps (int) – Number of timesteps to load

  • stride (int) – Number of steps between input and output variables

  • dt (float, optional) – Time in hours between each timestep in the dataset, by default 6 hr

  • start_year (int, optional) – Start year of dataset, by default 1980

  • num_samples_per_year (int) – Number of samples randomly taken from each year

  • variables (Union[List[str], None], optional for HDF5 files, mandatory for NetCDF4 files) – List of named variables to load. Variables will be read in the order specified by this parameter.

  • aux_variables (Union[Mapping[str, Callable], None], optional) – A dictionary mapping strings to callables that accept arguments (timestamps: numpy.ndarray, latlon: numpy.ndarray). These define any auxiliary variables returned from this source.

  • batch_size (int, optional) – Batch size, by default 1

  • shuffle (bool, optional) – Shuffle dataset, by default True

  • process_rank (int, optional) – Rank ID of local process, by default 0

  • world_size (int, optional) – Number of training processes, by default 1

class physicsnemo.datapipes.climate.climate.ClimateDataSourceSpec(
data_dir: str,
name: str | None = None,
file_type: str = 'hdf5',
stats_files: Mapping[str, str] | None = None,
metadata_path: str | None = None,
channels: List[int] | None = None,
variables: List[str] | None = None,
use_cos_zenith: bool = False,
aux_variables: Mapping[str, Callable] | None = None,
num_steps: int = 1,
stride: int = 1,
backend_kwargs: dict | None = None,
)[source]#

Bases: object

A data source specification for ClimateDatapipe.

HDF5 files should contain the following variable with the corresponding name: fields: Tensor of shape (num_timesteps, num_channels, height, width), containing climate data. The order of the channels should match the order of the channels in the statistics files. The statistics files should be .npy files with the shape (1, num_channels, 1, 1). The names of the variables are found in the metadata file found in metadata_path.

NetCDF4 files should contain a variable of shape (num_timesteps, height, width) for each variable they provide. Only the variables listed in variables will be loaded.

Parameters:
  • data_dir (str) – Directory where climate data is stored

  • name (Union[str, None], optional) – The name that is used to label datapipe outputs from this source. If None, the datapipe uses the number of the source in sequential order.

  • file_type (str) – Type of files to read, supported values are “hdf5” (default) and “netcdf4”

  • stats_files (Union[Mapping[str, str], None], optional) – Numpy files to data statistics for normalization. Supports either a channels format, in which case the dict should contain the keys “mean” and “std”, or a named-variable format, in which case the dict should contain the key “norm” . If None, no normalization will be used, by default None

  • metadata_path (Union[Mapping[str, str], None], optional for NetCDF, required for HDF5) – Path to the metadata JSON file for the dataset (usually called data.json).

  • channels (Union[List[int], None], optional) – Defines which climate variables to load, if None will use all in HDF5 file, by default None

  • variables (Union[List[str], None], optional for HDF5 files, mandatory for NetCDF4 files) – List of named variables to load. Variables will be read in the order specified by this parameter. Must be used for NetCDF4 files. Supported for HDF5 files in which case it will override channels.

  • use_cos_zenith (bool, optional) – If True, the cosine zenith angles corresponding to the coordinates of this data source will be produced, default False

  • aux_variables (Union[Mapping[str, Callable], None], optional) – A dictionary mapping strings to callables that accept arguments (timestamps: numpy.ndarray, latlon: numpy.ndarray). These define any auxiliary variables returned from this source.

  • num_steps (int, optional) – Number of timesteps to return, by default 1

  • stride (int, optional) – Number of steps between input and output variables. For example, if the dataset contains data at every 6 hours, a stride 1 = 6 hour delta t and stride 2 = 12 hours delta t, by default 1

dimensions_compatible(other) bool[source]#

Basic sanity check to test if two ClimateDataSourceSpec are compatible.

parse_dataset_files(
num_samples_per_year: int | None = None,
patch_size: int | None = None,
) None[source]#

Parses the data directory for valid files and determines training samples

Parameters:
  • num_samples_per_year (int, optional) – Number of samples taken from each year. If None, all will be used, by default None

  • patch_size (Union[Tuple[int, int], int, None], optional) – If specified, crops input and output variables so image dimensions are divisible by patch_size, by default None

Raises:

ValueError – In channels specified or number of samples per year is not valid

class physicsnemo.datapipes.climate.climate.ClimateDatapipe(
sources: Iterable[ClimateDataSourceSpec],
batch_size: int = 1,
dt: float = 6.0,
start_year: int = 1980,
latlon_bounds: Tuple[Tuple[float, float], Tuple[float, float]] = ((90, -90), (0, 360)),
crop_window: Tuple[Tuple[float, float], Tuple[float, float]] | None = None,
invariants: Mapping[str, Callable] | None = None,
num_samples_per_year: int | None = None,
shuffle: bool = True,
num_workers: int = 1,
device: str | device = 'cuda',
process_rank: int = 0,
world_size: int = 1,
)[source]#

Bases: Datapipe

A Climate DALI data pipeline. This pipeline loads data from HDF5/NetCDF4 files. It can also return additional data such as the solar zenith angle for each time step. Additionally, it normalizes the data if a statistics file is provided. The pipeline returns a dictionary with the following structure, where {name} indicates the name of the data source provided:

  • state_seq-{name}: Tensors of shape

    (batch_size, num_steps, num_channels, height, width). This sequence is drawn from the data file and normalized if a statistics file is provided.

  • timestamps-{name}: Tensors of shape (batch_size, num_steps), containing

    timestamps for each timestep in the sequence.

  • {aux_variable}-{name}: Tensors of shape

    (batch_size, num_steps, aux_channels, height, width), containing the auxiliary variables returned by each data source

  • cos_zenith-{name}: Tensors of shape (batch_size, num_steps, 1, height, width),

    containing the cosine of the solar zenith angle if specified.

  • {invariant_name}: Tensors of shape (batch_size, invariant_channels, height, width),

    containing the time-invariant data (depending only on spatial coordinates) returned by the datapipe. These can include e.g. land-sea mask and geopotential/surface elevation.

To use this data pipeline, your data directory must be structured as follows: ` data_dir ├── 1980.h5 ├── 1981.h5 ├── 1982.h5 ├── ... └── 2020.h5 `

The files are assumed have no metadata, such as timestamps. Because of this, it’s important to specify the dt parameter and the start_year parameter so that the pipeline can compute the correct timestamps for each timestep. These timestamps are then used to compute the cosine of the solar zenith angle, if specified.

Parameters:
  • sources (Iterable[ClimateDataSourceSpec]) – A list of data specifications defining the sources for the climate variables

  • batch_size (int, optional) – Batch size, by default 1

  • dt (float, optional) – Time in hours between each timestep in the dataset, by default 6 hr

  • start_year (int, optional) – Start year of dataset, by default 1980

  • latlon_bounds (Tuple[Tuple[float, float], Tuple[float, float]], optional) – Bounds of latitude and longitude in the data, in the format ((lat_start, lat_end,), (lon_start, lon_end)). By default ((90, -90), (0, 360)).

  • crop_window (Union[Tuple[Tuple[float, float], Tuple[float, float]], None], optional) – The window to crop the data to, in the format ((i0,i1), (j0,j1)) where the first spatial dimension will be cropped to i0:i1 and the second to j0:j1. If not given, all data will be used.

  • invariants (Mapping[str,Callable], optional) – Specifies the time-invariant data (for example latitude and longitude) included in the data samples. Should be a dict where the keys are the names of the invariants and the values are the corresponding functions. The functions need to accept an argument of the shape (2, data_shape[0], data_shape[1]) where the first dimension contains latitude and longitude in degrees and the other dimensions corresponding to the shape of data in the data files. For example, invariants={“trig_latlon”: invariants.LatLon()} will include the sin/cos of lat/lon in the output.

  • num_samples_per_year (int, optional) – Number of samples taken from each year. If None, all will be used, by default None

  • shuffle (bool, optional) – Shuffle dataset, by default True

  • num_workers (int, optional) – Number of workers, by default 1

  • device (Union[str, torch.device], optional) – Device for DALI pipeline to run on, by default cuda

  • process_rank (int, optional) – Rank ID of local process, by default 0

  • world_size (int, optional) – Number of training processes, by default 1

class physicsnemo.datapipes.climate.climate.ClimateHDF5DaliExternalSource(
data_paths: Iterable[str],
num_samples: int,
channels: Iterable[int],
num_steps: int,
stride: int,
dt: float,
start_year: int,
num_samples_per_year: int,
latlon: ndarray,
variables: List[str] | None = None,
aux_variables: List[str | Callable] = (),
batch_size: int = 1,
shuffle: bool = True,
process_rank: int = 0,
world_size: int = 1,
backend_kwargs: dict | None = None,
)[source]#

Bases: ClimateDaliExternalSource

DALI source for reading HDF5 formatted climate data files.

class physicsnemo.datapipes.climate.climate.ClimateNetCDF4DaliExternalSource(
data_paths: Iterable[str],
num_samples: int,
channels: Iterable[int],
num_steps: int,
stride: int,
dt: float,
start_year: int,
num_samples_per_year: int,
latlon: ndarray,
variables: List[str] | None = None,
aux_variables: List[str | Callable] = (),
batch_size: int = 1,
shuffle: bool = True,
process_rank: int = 0,
world_size: int = 1,
backend_kwargs: dict | None = None,
)[source]#

Bases: ClimateDaliExternalSource

DALI source for reading NetCDF4 formatted climate data files.

class physicsnemo.datapipes.climate.climate.MetaData(
name: str = 'Climate',
auto_device: bool = True,
cuda_graphs: bool = True,
ddp_sharding: bool = True,
)[source]#

Bases: DatapipeMetaData

The ClimateDataPipe provides a general interface for climate data processing, supporting various climate datasets and variables with standardized preprocessing and normalization.

class physicsnemo.datapipes.climate.synthetic.SyntheticWeatherDataLoader(*args, **kwargs)[source]#

Bases: DataLoader

This custom DataLoader initializes the SyntheticWeatherDataset with given arguments.

class physicsnemo.datapipes.climate.synthetic.SyntheticWeatherDataset(
channels: List[int],
num_samples_per_year: int,
num_steps: int,
device: str | device = 'cuda',
grid_size: Tuple[int, int] = (721, 1440),
base_temp: float = 15,
amplitude: float = 10,
noise_level: float = 2,
**kwargs: Any,
)[source]#

Bases: Dataset

A dataset for generating synthetic temperature data on a latitude-longitude grid for multiple atmospheric layers.

Parameters:
  • channels (list) – List of channels representing different atmospheric layers.

  • num_samples_per_year (int) – Total number of days to simulate per year.

  • num_steps (int) – Number of consecutive days in each training sample.

  • grid_size (tuple) – Latitude by longitude dimensions of the temperature grid.

  • base_temp (float) – Base temperature around which variations are simulated.

  • amplitude (float) – Amplitude of the sinusoidal temperature variation.

  • noise_level (float) – Standard deviation of the noise added to temperature data.

  • **kwargs – Additional keyword arguments for advanced configurations.

generate_data(
num_days: int,
num_channels: int,
grid_size: Tuple[int, int],
base_temp: float,
amplitude: float,
noise_level: float,
) ndarray[source]#

Generates synthetic temperature data over a specified number of days for multiple atmospheric layers.

Parameters:
  • num_days (int) – Number of days to generate data for.

  • num_channels (int) – Number of channels representing different layers.

  • grid_size (tuple) – Grid size (latitude, longitude).

  • base_temp (float) – Base mean temperature for the data.

  • amplitude (float) – Amplitude of temperature variations.

  • noise_level (float) – Noise level to add stochasticity to the temperature.

Returns:

A 4D array of temperature values across days, channels, latitudes, and longitudes.

Return type:

numpy.ndarray

The SyntheticWeatherDataset generates synthetic climate data for testing and development purposes, supporting various climate patterns and noise models.

class physicsnemo.datapipes.healpix.timeseries_dataset.MetaData(
name: str = 'TimeSeries',
auto_device: bool = False,
cuda_graphs: bool = False,
ddp_sharding: bool = False,
)[source]#

Bases: DatapipeMetaData

Metadata for this datapipe

class physicsnemo.datapipes.healpix.timeseries_dataset.TimeSeriesDataset(
dataset: xarray.Dataset,
scaling: DictConfig = None,
input_time_dim: int = 1,
output_time_dim: int = 1,
data_time_step: int | str = '3h',
time_step: int | str = '6h',
gap: int | str | None = None,
batch_size: int = 32,
drop_last: bool = False,
add_insolation: bool = False,
forecast_init_times: Sequence | None = None,
meta: DatapipeMetaData = MetaData(name='TimeSeries', auto_device=False, cuda_graphs=False, ddp_sharding=False),
)[source]#

Bases: Dataset, Datapipe

Dataset for sampling from continuous time-series data, compatible with pytorch data loading.

get_constants()[source]#

Returns the constants used in this dataset

Returns:

np.ndarray

Return type:

The list of constants, None if there are no constants

The TimeSeriesDataset handles spherical harmonic data in HEALPix format, supporting time series analysis of global climate variables.

Graph datapipes#

class physicsnemo.datapipes.gnn.vortex_shedding_dataset.VortexSheddingDataset(
name='dataset',
data_dir=None,
split='train',
num_samples=1000,
num_steps=600,
noise_std=0.02,
)[source]#

Bases: Dataset

In-memory MeshGraphNet Dataset for stationary mesh .. rubric:: Notes

  • This dataset prepares and processes the data available in MeshGraphNet’s repo:

    deepmind/deepmind-research

  • A single adj matrix is used for each transient simulation.

    Do not use with adaptive mesh or remeshing

Parameters:
  • name (str, optional) – Name of the dataset, by default “dataset”

  • data_dir (_type_, optional) – Specifying the directory that stores the raw data in .TFRecord format., by default None

  • split (str, optional) – Dataset split [“train”, “eval”, “test”], by default “train”

  • num_samples (int, optional) – Number of samples, by default 1000

  • num_steps (int, optional) – Number of time steps in each sample, by default 600

  • noise_std (float, optional) – The standard deviation of the noise added to the “train” split, by default 0.02

static add_edge_features(graph, pos)[source]#

adds relative displacement & displacement norm as edge features

static cell_to_adj(cells)[source]#

creates adjancy matrix in COO format from mesh cells

static create_graph(src, dst, dtype=torch.int32)[source]#

creates a PyG graph from an adj matrix in COO format. torch.int32 can handle graphs with up to 2**31-1 nodes or edges.

static denormalize(invar, mu, std)[source]#

denormalizes a tensor

static normalize_edge(graph, mu, std)[source]#

normalizes a tensor

static normalize_node(invar, mu, std)[source]#

normalizes a tensor

The VortexSheddingDataset processes flow field data around bluff bodies, capturing vortex shedding patterns and flow structures for graph-based learning.

class physicsnemo.datapipes.gnn.ahmed_body_dataset.AhmedBodyDataset(*args: Any, **kwargs: Any)[source]#

Bases: DGLDataset, Datapipe

In-memory Ahmed body Dataset

Parameters:
  • data_dir (str) – The directory where the data is stored.

  • split (str, optional) – The dataset split. Can be ‘train’, ‘validation’, or ‘test’, by default ‘train’.

  • num_samples (int, optional) – The number of samples to use, by default 10.

  • invar_keys (Iterable[str], optional) – The input node features to consider. Default includes ‘pos’, ‘velocity’, ‘reynolds_number’, ‘length’, ‘width’, ‘height’, ‘ground_clearance’, ‘slant_angle’, and ‘fillet_radius’.

  • outvar_keys (Iterable[str], optional) – The output features to consider. Default includes ‘p’ and ‘wallShearStress’.

  • Iterable[str] (normalize_keys) – The features to normalize. Default includes ‘p’, ‘wallShearStress’, ‘velocity’, ‘length’, ‘width’, ‘height’, ‘ground_clearance’, ‘slant_angle’, and ‘fillet_radius’.

  • optional – The features to normalize. Default includes ‘p’, ‘wallShearStress’, ‘velocity’, ‘length’, ‘width’, ‘height’, ‘ground_clearance’, ‘slant_angle’, and ‘fillet_radius’.

  • normalization_bound (Tuple[float, float], optional) – The lower and upper bounds for normalization. Default is (-1, 1).

  • force_reload (bool, optional) – If True, forces a reload of the data, by default False.

  • name (str, optional) – The name of the dataset, by default ‘dataset’.

  • verbose (bool, optional) – If True, enables verbose mode, by default False.

  • compute_drag (bool, optional) – If True, also returns the coefficient and mesh area and normals that are required for computing the drag coefficient.

  • num_workers (int, optional) – Number of dataset pre-loading workers. If None, will be chosen automatically.

add_edge_features() List[dgl.DGLGraph][source]#

Add relative displacement and displacement norm as edge features for each graph in the list of graphs. The calculations are done using the ‘pos’ attribute in the node data of each graph. The resulting edge features are stored in the ‘x’ attribute in the edge data of each graph.

This method will modify the list of graphs in-place.

Returns:

The list of graphs with updated edge features.

Return type:

List[dgl.DGLGraph]

create_graph(
index: int,
file_path: str,
info_path: str,
) None[source]#

Creates a graph from VTP file.

This method is used in parallel loading of graphs.

Return type:

Tuple that contains graph index, graph, and optionally coeff, normal and area values.

denormalize(
pred,
gt,
device,
) Tuple[Tensor, Tensor][source]#

Denormalize the graph node data.

Parameters:
  • pred (Tensor) – Normalized prediction

  • gt (Tensor) – Normalized ground truth

  • device (Any) – The device

Returns:

Denormalized prediction and ground truth

Return type:

Tuple(Tensor, Tensor)

normalize_edge() List[dgl.DGLGraph][source]#

Normalize edge data ‘x’ in each graph in the list of graphs.

Returns:

The list of graphs with normalized edge data ‘x’.

Return type:

List[dgl.DGLGraph]

normalize_node() List[dgl.DGLGraph][source]#

Normalize node data in each graph in the list of graphs.

Returns:

The list of graphs with normalized and concatenated node data.

Return type:

List[dgl.DGLGraph]

class physicsnemo.datapipes.gnn.ahmed_body_dataset.FileInfo(
velocity: float,
reynolds_number: float,
length: float,
width: float,
height: float,
ground_clearance: float,
slant_angle: float,
fillet_radius: float,
)[source]#

Bases: object

VTP file info storage.

class physicsnemo.datapipes.gnn.ahmed_body_dataset.MetaData(
name: str = 'AhmedBody',
auto_device: bool = True,
cuda_graphs: bool = False,
ddp_sharding: bool = True,
)[source]#

Bases: DatapipeMetaData

The AhmedBodyDataset manages flow field data around Ahmed bodies, supporting aerodynamic analysis and drag prediction tasks.

class physicsnemo.datapipes.gnn.drivaernet_dataset.DrivAerNetDataset(*args: Any, **kwargs: Any)[source]#

Bases: DGLDataset, Datapipe

DrivAerNet dataset.

Note: DrivAerNetDataset does not use default DGLDataset caching functionality such as has_cache, download etc, as it is invoked during the __init__ call so takes a lot of time. Instead, DrivAerNetDataset caches graphs in __getitem__ call thus avoiding long initialization delay.

Parameters:
  • data_dir (str) – The directory where the data is stored.

  • split (str, optional) – The dataset split. Can be ‘train’, ‘validation’, or ‘test’, by default ‘train’.

  • num_samples (int, optional) – The number of samples to use, by default 10.

  • coeff_filename (str, optional) – DrivAerNet coefficients file name, default is from the dataset location.

  • invar_keys (Iterable[str], optional) – The input node features to consider. Default includes ‘pos’.

  • outvar_keys (Iterable[str], optional) – The output features to consider. Default includes ‘p’ and ‘wallShearStress’.

  • Iterable[str] (normalize_keys) – The features to normalize. Default includes ‘p’ and ‘wallShearStress’.

  • optional – The features to normalize. Default includes ‘p’ and ‘wallShearStress’.

  • cache_dir (str, optional) – Path to the cache directory to store graphs in DGL format for fast loading. Default is ./cache/.

  • force_reload (bool, optional) – If True, forces a reload of the data, by default False.

  • name (str, optional) – The name of the dataset, by default ‘dataset’.

  • verbose (bool, optional) – If True, enables verbose mode, by default False.

denormalize(
pred: Tensor,
gt: Tensor,
device: device,
) tuple[Tensor, Tensor][source]#

Denormalizes the inputs using previously collected statistics.

class physicsnemo.datapipes.gnn.drivaernet_dataset.MetaData(
name: str = 'DrivAerNet',
auto_device: bool = True,
cuda_graphs: bool = False,
ddp_sharding: bool = True,
)[source]#

Bases: DatapipeMetaData

The DrivaerNetDataset handles automotive aerodynamics data, providing access to flow field measurements and surface pressure distributions.

class physicsnemo.datapipes.gnn.stokes_dataset.StokesDataset(*args: Any, **kwargs: Any)[source]#

Bases: DGLDataset

In-memory Stokes flow Dataset

Parameters:
  • data_dir (str) – The directory where the data is stored.

  • split (str, optional) – The dataset split. Can be ‘train’, ‘validation’, or ‘test’, by default ‘train’.

  • num_samples (int, optional) – The number of samples to use, by default 10.

  • invar_keys (List[str], optional) – The input node features to consider. Default includes ‘pos’ and ‘marker’

  • outvar_keys (List[str], optional) – The output features to consider. Default includes ‘u’, ‘v’, and ‘p’.

  • List[str] (normalize_keys) – The features to normalize. Default includes ‘u’, ‘v’, and ‘p’.

  • optional – The features to normalize. Default includes ‘u’, ‘v’, and ‘p’.

  • force_reload (bool, optional) – If True, forces a reload of the data, by default False.

  • name (str, optional) – The name of the dataset, by default ‘dataset’.

  • verbose (bool, optional) – If True, enables verbose mode, by default False.

add_edge_features()[source]#

adds relative displacement & displacement norm as edge features

static denormalize(invar, mu, std)[source]#

denormalizes a tensor

normalize_edge()[source]#

normalizes a tensor

normalize_node()[source]#

normalizes node features

The StokesDataset processes Stokes flow simulations, supporting various boundary conditions and geometry configurations for microfluidic applications.

physicsnemo.datapipes.gnn.utils.load_json(file: str) Dict[str, Tensor][source]#

Loads a JSON file into a dictionary of PyTorch tensors.

Parameters:

file (str) – Path to the JSON file.

Returns:

Dictionary where each value is a PyTorch tensor.

Return type:

Dict[str, torch.Tensor]

physicsnemo.datapipes.gnn.utils.read_vtp_file(file_path: str) Any[source]#

Read a VTP file and return the polydata.

Parameters:

file_path (str) – Path to the VTP file.

Returns:

The polydata read from the VTP file.

Return type:

vtkPolyData

physicsnemo.datapipes.gnn.utils.save_json(var: Dict[str, Tensor], file: str) None[source]#

Saves a dictionary of tensors to a JSON file.

Parameters:
  • var (Dict[str, torch.Tensor]) – Dictionary where each value is a PyTorch tensor.

  • file (str) – Path to the output JSON file.

The GNN utilities provide helper functions for graph construction, feature extraction, and data preprocessing in graph-based physics learning tasks.

CAE datapipes#

class physicsnemo.datapipes.cae.mesh_datapipe.MeshDaliExternalSource(
data_paths: Iterable[str],
file_format: str,
variables: List[str],
num_samples: int,
batch_size: int = 1,
shuffle: bool = True,
process_rank: int = 0,
world_size: int = 1,
cache_data: bool = False,
)[source]#

Bases: object

DALI Source for lazy-loading with caching of mesh data

Parameters:
  • data_paths (Iterable[str]) – Directory where data is stored

  • num_samples (int) – Total number of training samples

  • batch_size (int, optional) – Batch size, by default 1

  • shuffle (bool, optional) – Shuffle dataset, by default True

  • process_rank (int, optional) – Rank ID of local process, by default 0

  • world_size (int, optional) – Number of training processes, by default 1

  • cache_data (False, optional) – Whether to cache the data in memory for faster access in subsequent epochs, by default False

class physicsnemo.datapipes.cae.mesh_datapipe.MeshDatapipe(
data_dir: str,
variables: List[str],
num_variables: int,
file_format: str = 'vtp',
stats_dir: str | None = None,
batch_size: int = 1,
num_samples: int = 1,
shuffle: bool = True,
num_workers: int = 1,
device: str | device = 'cuda',
process_rank: int = 0,
world_size: int = 1,
cache_data: bool = False,
)[source]#

Bases: Datapipe

DALI data pipeline for mesh data

Parameters:
  • data_dir (str) – Directory where ERA5 data is stored

  • variables (List[str, None]) – Ordered list of variables to be loaded from the files

  • num_variables (int) – Number of variables to be loaded from the files

  • file_format (str, optional) – File format of the data, by default “vtp” Supported formats: “vtp”, “vtu”, “cgns”

  • stats_dir (Union[str, None], optional) – Directory where statistics are stored, by default None If provided, the statistics are used to normalize the attributes

  • batch_size (int, optional) – Batch size, by default 1

  • num_steps (int, optional) – Number of timesteps are included in the output variables, by default 1

  • shuffle (bool, optional) – Shuffle dataset, by default True

  • num_workers (int, optional) – Number of workers, by default 1

  • device (Union[str, torch.device], optional) – Device for DALI pipeline to run on, by default cuda

  • process_rank (int, optional) – Rank ID of local process, by default 0

  • world_size (int, optional) – Number of training processes, by default 1

  • cache_data (False, optional) – Whether to cache the data in memory for faster access in subsequent epochs, by default False

load_statistics() None[source]#

Loads statistics from pre-computed numpy files

The statistic files should be of name global_means.npy and global_std.npy with a shape of [1, C] located in the stat_dir.

Raises:
  • IOError – If mean or std numpy files are not found

  • AssertionError – If loaded numpy arrays are not of correct size

parse_dataset_files() None[source]#

Parses the data directory for valid files and determines training samples

Raises:

ValueError – In channels specified or number of samples per year is not valid

class physicsnemo.datapipes.cae.mesh_datapipe.MetaData(
name: str = 'MeshDatapipe',
auto_device: bool = True,
cuda_graphs: bool = True,
ddp_sharding: bool = True,
)[source]#

Bases: DatapipeMetaData

The MeshDataPipe handles mesh data for physics simulations, supporting various mesh formats and providing utilities for mesh preprocessing and feature extraction.

This code provides the datapipe for reading the processed npy files, generating multi-res grids, calculating signed distance fields, positional encodings, sampling random points in the volume and on surface, normalizing fields and returning the output tensors as a dictionary.

This datapipe also non-dimensionalizes the fields, so the order in which the variables should be fixed: velocity, pressure, turbulent viscosity for volume variables and pressure, wall-shear-stress for surface variables. The different parameters such as variable names, domain resolution, sampling size etc. are configurable in config.yaml.

class physicsnemo.datapipes.cae.domino_datapipe.BoundingBox(*args, **kwargs)[source]#

Bases: Protocol

Type definition for the required format of bounding box dimensions.

class physicsnemo.datapipes.cae.domino_datapipe.CachedDoMINODataset(
data_path: str | Path,
phase: Literal['train', 'val', 'test'] = 'train',
sampling: bool = False,
volume_points_sample: int | None = None,
surface_points_sample: int | None = None,
geom_points_sample: int | None = None,
model_type=None,
deterministic_seed=False,
surface_sampling_algorithm='area_weighted',
)[source]#

Bases: Dataset

Dataset for reading cached DoMINO data files, with optional resampling. Acts as a drop-in replacement for DoMINODataPipe.

class physicsnemo.datapipes.cae.domino_datapipe.DoMINODataConfig(
data_path: Path,
phase: Literal['train', 'val', 'test'],
surface_variables: Sequence | None = ('pMean', 'wallShearStress'),
surface_points_sample: int = 1024,
num_surface_neighbors: int = 11,
resample_surfaces: bool = False,
resampling_points: int = 1000000,
surface_sampling_algorithm: str = typing.Literal['area_weighted', 'random'],
surface_factors: Sequence | None = None,
bounding_box_dims_surf: BoundingBox | Sequence | None = None,
volume_variables: Sequence | None = ('UMean', 'pMean'),
volume_points_sample: int = 1024,
volume_factors: Sequence | None = None,
bounding_box_dims: BoundingBox | Sequence | None = None,
grid_resolution: Sequence | ndarray | ndarray = (256, 96, 64),
normalize_coordinates: bool = False,
sample_in_bbox: bool = False,
sampling: bool = False,
geom_points_sample: int = 300000,
positional_encoding: bool = False,
scaling_type: Literal['min_max_scaling', 'mean_std_scaling'] | None = None,
compute_scaling_factors: bool = False,
caching: bool = False,
deterministic: bool = False,
gpu_preprocessing: bool = True,
gpu_output: bool = True,
)[source]#

Bases: object

Configuration for DoMINO dataset processing pipeline.

data_path#

Path to the dataset to load.

Type:

pathlib.Path

phase#

Which phase of data to load (“train”, “val”, or “test”).

Type:

Literal[‘train’, ‘val’, ‘test’]

surface_variables#

(Surface specific) Names of surface variables.

Type:

Sequence | None

surface_points_sample#

(Surface specific) Number of surface points to sample per batch.

Type:

int

num_surface_neighbors#

(Surface specific) Number of surface neighbors to consider for nearest neighbors approach.

Type:

int

resample_surfaces#

(Surface specific) Whether to resample the surface before kdtree/knn. Not available if caching.

Type:

bool

resampling_points#

(Surface specific) Number of points to resample the surface to.

Type:

int

surface_sampling_algorithm#

(Surface specific) Algorithm to use for surface sampling (“area_weighted” or “random”).

Type:

str

surface_factors#

(Surface specific) Non-dimensionalization factors for surface variables. If set, and scaling_type is: - min_max_scaling -> rescale surface_fields to the min/max set here - mean_std_scaling -> rescale surface_fields to the mean and std set here.

Type:

Sequence | None

bounding_box_dims_surf#

(Surface specific) Dimensions of bounding box. Must be an object with min/max attributes that are arraylike.

Type:

physicsnemo.datapipes.cae.domino_datapipe.BoundingBox | Sequence | None

volume_variables#

(Volume specific) Names of volume variables.

Type:

Sequence | None

volume_points_sample#

(Volume specific) Number of volume points to sample per batch.

Type:

int

volume_factors#

(Volume specific) Non-dimensionalization factors for volume variables scaling. If set, and scaling_type is: - min_max_scaling -> rescale volume_fields to the min/max set here - mean_std_scaling -> rescale volume_fields to the mean and std set here.

Type:

Sequence | None

bounding_box_dims#

(Volume specific) Dimensions of bounding box. Must be an object with min/max attributes that are arraylike.

Type:

physicsnemo.datapipes.cae.domino_datapipe.BoundingBox | Sequence | None

grid_resolution#

Resolution of the latent grid.

Type:

Sequence | numpy.ndarray | cupy.ndarray

normalize_coordinates#

Whether to normalize coordinates based on min/max values. For surfaces: uses s_min/s_max, defined from: - Surface bounding box, if defined. - Min/max of the stl_vertices For volumes: uses c_min/c_max, defined from: - Volume bounding_box if defined, - 1.5x s_min/max otherwise, except c_min[2] = s_min[2] in this case

Type:

bool

sample_in_bbox#

Whether to sample points in a specified bounding box. Uses the same min/max points as coordinate normalization. Only performed if compute_scaling_factors is false.

Type:

bool

sampling#

Whether to downsample the full resolution mesh to fit in GPU memory. Surface and volume sampling points are configured separately as: - surface.points_sample - volume.points_sample

Type:

bool

geom_points_sample#

Number of STL points sampled per batch. Independent of volume.points_sample and surface.points_sample.

Type:

int

positional_encoding#

Whether to use positional encoding. Affects the calculation of: - pos_volume_closest - pos_volume_center_of_mass - pos_surface_centter_of_mass

Type:

bool

scaling_type#

Scaling type for volume variables. If used, will rescale the volume_fields and surface fields outputs. Requires volume.factor and surface.factor to be set.

Type:

Literal[‘min_max_scaling’, ‘mean_std_scaling’] | None

compute_scaling_factors#

Whether to compute scaling factors. Not available if caching. Many preprocessing pieces are disabled if computing scaling factors.

Type:

bool

caching#

Whether this is for caching or serving.

Type:

bool

deterministic#

Whether to use a deterministic seed for sampling and random numbers.

Type:

bool

gpu_preprocessing#

Whether to do preprocessing on the GPU (False for CPU).

Type:

bool

gpu_output#

Whether to return output on the GPU as cupy arrays. If False, returns numpy arrays. You might choose gpu_preprocessing=True and gpu_output=False if caching.

Type:

bool

class physicsnemo.datapipes.cae.domino_datapipe.DoMINODataPipe(
input_path,
model_type: Literal['surface', 'volume', 'combined'],
**data_config_overrides,
)[source]#

Bases: Dataset

Datapipe for DoMINO

physicsnemo.datapipes.cae.domino_datapipe.domino_collate_fn(batch)[source]#

This function is a custom collation function to move cupy data to torch tensors on the device.

For things that aren’t cupy arrays, fall back to torch.data.default_convert. Data, here, is a dictionary of numpy arrays or cupy arrays.