Weather and Climate Datapipes#
The ERA5HDF5Datapipe handles ERA5 reanalysis data stored in HDF5 format, providing access to atmospheric variables like temperature, pressure, and wind fields at various pressure levels. The ERA5HDF5Datapipe is used in the DLWP Example.
import torch
from physicsnemo.datapipes.climate.era5_hdf5 import ERA5HDF5Datapipe
def main():
# Create a datapipe for ERA5 weather data in HDF5 format
datapipe = ERA5HDF5Datapipe(
data_dir="path/to/era5/data",
stats_dir="path/to/era5/stats",
channels=[0, 1],
latlon_resolution=(721, 1440),
shuffle=True,
)
# Iterate through the datapipe
for batch in datapipe:
invar = batch[0]["invar"]
outvar = batch[0]["outvar"]
# Use the data for weather prediction or analysis
...
if __name__ == "__main__":
main()
- class physicsnemo.datapipes.climate.era5_hdf5.ERA5DaliExternalSource(
- data_paths: Iterable[str],
- num_samples: int,
- channels: Iterable[int],
- num_steps: int,
- num_history: int,
- stride: int,
- num_samples_per_year: int,
- use_cos_zenith: bool,
- cos_zenith_args: Dict,
- use_time_of_year_index: bool,
- batch_size: int = 1,
- shuffle: bool = True,
- process_rank: int = 0,
- world_size: int = 1,
Bases:
objectDALI Source for lazy-loading the HDF5 ERA5 files
- Parameters:
data_paths (Iterable[str]) – Directory where ERA5 data is stored
num_samples (int) – Total number of training samples
channels (Iterable[int]) – List representing which ERA5 variables to load
start_year (int, optional) – Start year of dataset
stride (int) – Number of steps between input and output variables
num_steps (int) – Number of timesteps are included in the output variables
num_history (int) – Number of previous timesteps included in the input variables
num_samples_per_year (int) – Number of samples randomly taken from each year
batch_size (int, optional) – Batch size, by default 1
use_cos_zenith (bool) – If True, the cosine zenith angles corresponding to the coordinates will be produced
cos_zenith_args (Dict) –
Dictionary containing the following:
- dt: float
Time in hours between each timestep in the dataset
- start_year: int
Start year of dataset
shuffle (bool, optional) – Shuffle dataset, by default True
process_rank (int, optional) – Rank ID of local process, by default 0
world_size (int, optional) – Number of training processes, by default 1
Note
For more information about DALI external source operator: https://docs.nvidia.com/deeplearning/dali/archives/dali_1_13_0/user-guide/docs/examples/general/data_loading/parallel_external_source.html
- class physicsnemo.datapipes.climate.era5_hdf5.ERA5HDF5Datapipe(
- data_dir: str,
- stats_dir: str | None = None,
- channels: List[int] | None = None,
- batch_size: int = 1,
- num_steps: int = 1,
- num_history: int = 0,
- stride: int = 1,
- latlon_resolution: Tuple[int, int] | None = None,
- interpolation_type: str | None = None,
- patch_size: Tuple[int, int] | int | None = None,
- num_samples_per_year: int | None = None,
- use_cos_zenith: bool = False,
- cos_zenith_args: Dict = {},
- use_time_of_year_index: bool = False,
- shuffle: bool = True,
- num_workers: int = 1,
- device: str | device = 'cuda',
- process_rank: int = 0,
- world_size: int = 1,
Bases:
DatapipeERA5 DALI data pipeline for HDF5 files
- Parameters:
data_dir (str) – Directory where ERA5 data is stored
stats_dir (Union[str, None], optional) – Directory to data statistic numpy files for normalization, if None, no normalization will be used, by default None
channels (Union[List[int], None], optional) – Defines which ERA5 variables to load, if None will use all in HDF5 file, by default None
batch_size (int, optional) – Batch size, by default 1
stride (int, optional) – Number of steps between input and output variables. For example, if the dataset contains data at every 6 hours, a stride 1 = 6 hour delta t and stride 2 = 12 hours delta t, by default 1
num_steps (int, optional) – Number of timesteps are included in the output variables, by default 1
num_history (int, optional) – Number of previous timesteps included in the input variables, by default 0
latlon_resolution (Tuple[int, int], optional) – The resolution for the latitude-longitude grid (H, W). Needs to be specified for cos zenith angle computation, or interpolation. By default None
interpolation_type (str, optional) – Interpolation type for resizing. Supports [“INTERP_NN”, “INTERP_LINEAR”, “INTERP_CUBIC”, “INTERP_LANCZOS3”, “INTERP_TRIANGULAR”, “INTERP_GAUSSIAN”]. By default None (no interpolation is done)
patch_size (Union[Tuple[int, int], int, None], optional) – If specified, crops input and output variables so image dimensions are divisible by patch_size, by default None
num_samples_per_year (int, optional) – Number of samples randomly taken from each year. If None, all will be used, by default None
use_cos_zenith (bool, optional) – If True, the cosine zenith angles corresponding to the coordinates will be produced, by default False
cos_zenith_args (Dict, optional) –
Dictionary containing the following:
- dt: float, optional
Time in hours between each timestep in the dataset, by default 6 hr
- start_year: int, optional
Start year of dataset, by default 1980
- latlon_boundsTuple[Tuple[float, float], Tuple[float, float]], optional
Bounds of latitude and longitude in the data, in the format ((lat_start, lat_end,), (lon_start, lon_end)). By default ((90, -90), (0, 360)).
Defaults are only applicable if use_cos_zenith is True. Otherwise, defaults to {}.
use_time_of_year_index (bool) – If true, also returns the index that can be used to determine the time of the year corresponding to each sample. By default False.
shuffle (bool, optional) – Shuffle dataset, by default True
num_workers (int, optional) – Number of workers, by default 1
device (Union[str, torch.device], optional) – Device for DALI pipeline to run on, by default cuda
process_rank (int, optional) – Rank ID of local process, by default 0
world_size (int, optional) – Number of training processes, by default 1
- load_statistics() None[source]#
Loads ERA5 statistics from pre-computed numpy files
The statistic files should be of name global_means.npy and global_std.npy with a shape of [1, C, 1, 1] located in the stat_dir.
- Raises:
IOError – If mean or std numpy files are not found
AssertionError – If loaded numpy arrays are not of correct size
- class physicsnemo.datapipes.climate.era5_hdf5.MetaData(
- name: str = 'ERA5HDF5',
- auto_device: bool = True,
- cuda_graphs: bool = True,
- ddp_sharding: bool = True,
Bases:
DatapipeMetaData
The ClimateDataPipe provides a general interface for climate data processing, supporting various climate datasets and variables with standardized preprocessing and normalization.
The ClimateDataPipe is used in the Weather Diagnostic example and the Temporal Interpolation example.
- class physicsnemo.datapipes.climate.climate.ClimateDaliExternalSource(
- data_paths: Iterable[str],
- num_samples: int,
- channels: Iterable[int],
- num_steps: int,
- stride: int,
- dt: float,
- start_year: int,
- num_samples_per_year: int,
- latlon: ndarray,
- variables: List[str] | None = None,
- aux_variables: List[str | Callable] = (),
- batch_size: int = 1,
- shuffle: bool = True,
- process_rank: int = 0,
- world_size: int = 1,
- backend_kwargs: dict | None = None,
Bases:
ABCDALI Source for lazy-loading the HDF5/NetCDF4 climate files
- Parameters:
data_paths (Iterable[str]) – Directory where climate data is stored
num_samples (int) – Total number of training samples
channels (Iterable[int]) – List representing which climate variables to load
num_steps (int) – Number of timesteps to load
stride (int) – Number of steps between input and output variables
dt (float, optional) – Time in hours between each timestep in the dataset, by default 6 hr
start_year (int, optional) – Start year of dataset, by default 1980
num_samples_per_year (int) – Number of samples randomly taken from each year
variables (Union[List[str], None], optional for HDF5 files, mandatory for NetCDF4 files) – List of named variables to load. Variables will be read in the order specified by this parameter.
aux_variables (Union[Mapping[str, Callable], None], optional) – A dictionary mapping strings to callables that accept arguments (timestamps: numpy.ndarray, latlon: numpy.ndarray). These define any auxiliary variables returned from this source.
batch_size (int, optional) – Batch size, by default 1
shuffle (bool, optional) – Shuffle dataset, by default True
process_rank (int, optional) – Rank ID of local process, by default 0
world_size (int, optional) – Number of training processes, by default 1
Note
For more information about DALI external source operator: https://docs.nvidia.com/deeplearning/dali/archives/dali_1_13_0/user-guide/docs/examples/general/data_loading/parallel_external_source.html
- class physicsnemo.datapipes.climate.climate.ClimateDataSourceSpec(
- data_dir: str,
- name: str | None = None,
- file_type: str = 'hdf5',
- stats_files: Mapping[str, str] | None = None,
- metadata_path: str | None = None,
- channels: List[int] | None = None,
- variables: List[str] | None = None,
- use_cos_zenith: bool = False,
- aux_variables: Mapping[str, Callable] | None = None,
- num_steps: int = 1,
- stride: int = 1,
- backend_kwargs: dict | None = None,
Bases:
objectA data source specification for ClimateDatapipe.
HDF5 files should contain the following variable with the corresponding name: fields: Tensor of shape (num_timesteps, num_channels, height, width), containing climate data. The order of the channels should match the order of the channels in the statistics files. The statistics files should be .npy files with the shape (1, num_channels, 1, 1). The names of the variables are found in the metadata file found in metadata_path.
NetCDF4 files should contain a variable of shape (num_timesteps, height, width) for each variable they provide. Only the variables listed in variables will be loaded.
- Parameters:
data_dir (str) – Directory where climate data is stored
name (Union[str, None], optional) – The name that is used to label datapipe outputs from this source. If None, the datapipe uses the number of the source in sequential order.
file_type (str) – Type of files to read, supported values are “hdf5” (default) and “netcdf4”
stats_files (Union[Mapping[str, str], None], optional) – Numpy files to data statistics for normalization. Supports either a channels format, in which case the dict should contain the keys “mean” and “std”, or a named-variable format, in which case the dict should contain the key “norm” . If None, no normalization will be used, by default None
metadata_path (Union[Mapping[str, str], None], optional for NetCDF, required for HDF5) – Path to the metadata JSON file for the dataset (usually called data.json).
channels (Union[List[int], None], optional) – Defines which climate variables to load, if None will use all in HDF5 file, by default None
variables (Union[List[str], None], optional for HDF5 files, mandatory for NetCDF4 files) – List of named variables to load. Variables will be read in the order specified by this parameter. Must be used for NetCDF4 files. Supported for HDF5 files in which case it will override channels.
use_cos_zenith (bool, optional) – If True, the cosine zenith angles corresponding to the coordinates of this data source will be produced, default False
aux_variables (Union[Mapping[str, Callable], None], optional) – A dictionary mapping strings to callables that accept arguments (timestamps: numpy.ndarray, latlon: numpy.ndarray). These define any auxiliary variables returned from this source.
num_steps (int, optional) – Number of timesteps to return, by default 1
stride (int, optional) – Number of steps between input and output variables. For example, if the dataset contains data at every 6 hours, a stride 1 = 6 hour delta t and stride 2 = 12 hours delta t, by default 1
- dimensions_compatible(other) bool[source]#
Basic sanity check to test if two ClimateDataSourceSpec are compatible.
- parse_dataset_files(
- num_samples_per_year: int | None = None,
- patch_size: int | None = None,
Parses the data directory for valid files and determines training samples
- Parameters:
num_samples_per_year (int, optional) – Number of samples taken from each year. If None, all will be used, by default None
patch_size (Union[Tuple[int, int], int, None], optional) – If specified, crops input and output variables so image dimensions are divisible by patch_size, by default None
- Raises:
ValueError – In channels specified or number of samples per year is not valid
- class physicsnemo.datapipes.climate.climate.ClimateDatapipe(
- sources: Iterable[ClimateDataSourceSpec],
- batch_size: int = 1,
- dt: float = 6.0,
- start_year: int = 1980,
- latlon_bounds: Tuple[Tuple[float, float], Tuple[float, float]] = ((90, -90), (0, 360)),
- crop_window: Tuple[Tuple[float, float], Tuple[float, float]] | None = None,
- invariants: Mapping[str, Callable] | None = None,
- num_samples_per_year: int | None = None,
- shuffle: bool = True,
- num_workers: int = 1,
- device: str | device = 'cuda',
- process_rank: int = 0,
- world_size: int = 1,
Bases:
DatapipeA Climate DALI data pipeline. This pipeline loads data from HDF5/NetCDF4 files. It can also return additional data such as the solar zenith angle for each time step. Additionally, it normalizes the data if a statistics file is provided. The pipeline returns a dictionary with the following structure, where {name} indicates the name of the data source provided:
state_seq-{name}: Tensors of shape(batch_size, num_steps, num_channels, height, width). This sequence is drawn from the data file and normalized if a statistics file is provided.
timestamps-{name}: Tensors of shape (batch_size, num_steps), containingtimestamps for each timestep in the sequence.
{aux_variable}-{name}: Tensors of shape(batch_size, num_steps, aux_channels, height, width), containing the auxiliary variables returned by each data source
cos_zenith-{name}: Tensors of shape (batch_size, num_steps, 1, height, width),containing the cosine of the solar zenith angle if specified.
{invariant_name}: Tensors of shape (batch_size, invariant_channels, height, width),containing the time-invariant data (depending only on spatial coordinates) returned by the datapipe. These can include e.g. land-sea mask and geopotential/surface elevation.
To use this data pipeline, your data directory must be structured as follows:
` data_dir ├── 1980.h5 ├── 1981.h5 ├── 1982.h5 ├── ... └── 2020.h5 `The files are assumed have no metadata, such as timestamps. Because of this, it’s important to specify the dt parameter and the start_year parameter so that the pipeline can compute the correct timestamps for each timestep. These timestamps are then used to compute the cosine of the solar zenith angle, if specified.
- Parameters:
sources (Iterable[ClimateDataSourceSpec]) – A list of data specifications defining the sources for the climate variables
batch_size (int, optional) – Batch size, by default 1
dt (float, optional) – Time in hours between each timestep in the dataset, by default 6 hr
start_year (int, optional) – Start year of dataset, by default 1980
latlon_bounds (Tuple[Tuple[float, float], Tuple[float, float]], optional) – Bounds of latitude and longitude in the data, in the format ((lat_start, lat_end,), (lon_start, lon_end)). By default ((90, -90), (0, 360)).
crop_window (Union[Tuple[Tuple[float, float], Tuple[float, float]], None], optional) – The window to crop the data to, in the format ((i0,i1), (j0,j1)) where the first spatial dimension will be cropped to i0:i1 and the second to j0:j1. If not given, all data will be used.
invariants (Mapping[str,Callable], optional) – Specifies the time-invariant data (for example latitude and longitude) included in the data samples. Should be a dict where the keys are the names of the invariants and the values are the corresponding functions. The functions need to accept an argument of the shape (2, data_shape[0], data_shape[1]) where the first dimension contains latitude and longitude in degrees and the other dimensions corresponding to the shape of data in the data files. For example, invariants={“trig_latlon”: invariants.LatLon()} will include the sin/cos of lat/lon in the output.
num_samples_per_year (int, optional) – Number of samples taken from each year. If None, all will be used, by default None
shuffle (bool, optional) – Shuffle dataset, by default True
num_workers (int, optional) – Number of workers, by default 1
device (Union[str, torch.device], optional) – Device for DALI pipeline to run on, by default cuda
process_rank (int, optional) – Rank ID of local process, by default 0
world_size (int, optional) – Number of training processes, by default 1
- class physicsnemo.datapipes.climate.climate.ClimateHDF5DaliExternalSource(
- data_paths: Iterable[str],
- num_samples: int,
- channels: Iterable[int],
- num_steps: int,
- stride: int,
- dt: float,
- start_year: int,
- num_samples_per_year: int,
- latlon: ndarray,
- variables: List[str] | None = None,
- aux_variables: List[str | Callable] = (),
- batch_size: int = 1,
- shuffle: bool = True,
- process_rank: int = 0,
- world_size: int = 1,
- backend_kwargs: dict | None = None,
Bases:
ClimateDaliExternalSourceDALI source for reading HDF5 formatted climate data files.
- class physicsnemo.datapipes.climate.climate.ClimateNetCDF4DaliExternalSource(
- data_paths: Iterable[str],
- num_samples: int,
- channels: Iterable[int],
- num_steps: int,
- stride: int,
- dt: float,
- start_year: int,
- num_samples_per_year: int,
- latlon: ndarray,
- variables: List[str] | None = None,
- aux_variables: List[str | Callable] = (),
- batch_size: int = 1,
- shuffle: bool = True,
- process_rank: int = 0,
- world_size: int = 1,
- backend_kwargs: dict | None = None,
Bases:
ClimateDaliExternalSourceDALI source for reading NetCDF4 formatted climate data files.
- class physicsnemo.datapipes.climate.climate.MetaData(
- name: str = 'Climate',
- auto_device: bool = True,
- cuda_graphs: bool = True,
- ddp_sharding: bool = True,
Bases:
DatapipeMetaData
The SyntheticWeatherDataset generates synthetic climate data for testing and development purposes, supporting various climate patterns and noise models.
- class physicsnemo.datapipes.climate.synthetic.SyntheticWeatherDataLoader(*args, **kwargs)[source]#
Bases:
DataLoaderThis custom DataLoader initializes the SyntheticWeatherDataset with given arguments.
- class physicsnemo.datapipes.climate.synthetic.SyntheticWeatherDataset(
- channels: List[int],
- num_samples_per_year: int,
- num_steps: int,
- device: str | device = 'cuda',
- grid_size: Tuple[int, int] = (721, 1440),
- base_temp: float = 15,
- amplitude: float = 10,
- noise_level: float = 2,
- **kwargs: Any,
Bases:
DatasetA dataset for generating synthetic temperature data on a latitude-longitude grid for multiple atmospheric layers.
- Parameters:
channels (list) – List of channels representing different atmospheric layers.
num_samples_per_year (int) – Total number of days to simulate per year.
num_steps (int) – Number of consecutive days in each training sample.
grid_size (tuple) – Latitude by longitude dimensions of the temperature grid.
base_temp (float) – Base temperature around which variations are simulated.
amplitude (float) – Amplitude of the sinusoidal temperature variation.
noise_level (float) – Standard deviation of the noise added to temperature data.
**kwargs – Additional keyword arguments for advanced configurations.
- generate_data(
- num_days: int,
- num_channels: int,
- grid_size: Tuple[int, int],
- base_temp: float,
- amplitude: float,
- noise_level: float,
Generates synthetic temperature data over a specified number of days for multiple atmospheric layers.
- Parameters:
num_days (int) – Number of days to generate data for.
num_channels (int) – Number of channels representing different layers.
grid_size (tuple) – Grid size (latitude, longitude).
base_temp (float) – Base mean temperature for the data.
amplitude (float) – Amplitude of temperature variations.
noise_level (float) – Noise level to add stochasticity to the temperature.
- Returns:
A 4D array of temperature values across days, channels, latitudes, and longitudes.
- Return type:
numpy.ndarray
The TimeSeriesDataset handles spherical harmonic data in HEALPix format, supporting time series analysis of global climate variables.
- class physicsnemo.datapipes.healpix.timeseries_dataset.MetaData(
- name: str = 'TimeSeries',
- auto_device: bool = False,
- cuda_graphs: bool = False,
- ddp_sharding: bool = False,
Bases:
DatapipeMetaDataMetadata for this datapipe
- class physicsnemo.datapipes.healpix.timeseries_dataset.TimeSeriesDataset(
- dataset: xr.Dataset,
- scaling: DictConfig = None,
- input_time_dim: int = 1,
- output_time_dim: int = 1,
- data_time_step: int | str = '3h',
- time_step: int | str = '6h',
- gap: int | str | None = None,
- batch_size: int = 32,
- drop_last: bool = False,
- add_insolation: bool = False,
- forecast_init_times: Sequence | None = None,
- meta: DatapipeMetaData = MetaData(name='TimeSeries', auto_device=False, cuda_graphs=False, ddp_sharding=False),
Bases:
Dataset,DatapipeDataset for sampling from continuous time-series data, compatible with pytorch data loading.