Data Curation#
PhysicsNeMo Curator is a sub-module of the PhysicsNeMo framework, a Pythonic library designed to streamline and accelerate the crucial process of data curation at scale for engineering and scientific datasets for training and inference. It accelerates data curation by leveraging GPUs and parallel processing.
This includes customizable interfaces and pipelines for extracting, transforming, and loading data in supported formats and schema. This package is intended to be used as part of the PhysicsNeMo framework.
For more tutorials, refer to PhysicsNeMo Curator Documentation.
Why PhysicsNeMo Curator?#
Scientific data curation is a critical step in the development of AI models, because data is often complex and large, requiring distributed processing, high precision, and storage.
PhysicsNeMo Curator is designed to handle these challenges, providing a flexible and scalable framework for data curation. It contains several key features:
Parallel Processing: Built-in support for multi-process and distributed workflows
Modular Architecture: Composable data sources, transformations, and sinks
Format Flexibility: Support for various scientific data formats (HDF5, VTK, STL, and Zarr)
Schema Validation: Built-in dataset validation capabilities
Extensible Framework: Easy to extend for custom data formats and transformations
Integration: Seamless integration with the broader PhysicsNeMo ecosystem
Core Concepts#
The PhysicsNeMo-Curator ETL pipeline consists of the following main components:
DataSource - Handles reading input data and writing output data
DataTransformation - Transforms data from one format to another
DatasetValidator - Validates input data structure and content (optional)
ParallelProcessor - Orchestrates processing files in parallel
The framework is designed for composability. You can mix and match different data sources, transformations, and sinks to create custom pipelines for your specific use case.
Quick Start Tutorial#
This tutorial demonstrates a basic HDF5 to Zarr conversion pipeline. For the complete tutorial with dataset validation, see the PhysicsNeMo Curator Tutorial.
Installation#
The recommended way to use PhysicsNeMo-Curator is through the PhysicsNeMo Docker container.
To run the setup script to install PhysicsNeMo-Curator from source:
# Pull the PhysicsNeMo Docker container
docker pull nvcr.io/nvidia/physicsnemo/physicsnemo:25.08
# Install from source
git clone https://github.com/NVIDIA/physicsnemo-curator.git
cd physicsnemo_curator
pip install -e "."
Basic HDF5 to Zarr Pipeline#
Let’s create a basic pipeline that converts HDF5 files to Zarr format. This example demonstrates the core concepts without the complexity of full dataset validation.
Create a directory named
tutorial
and use it for all the test scripts.Define the Data Schema
For this tutorial, we’ll work with a basic physics simulation dataset:
# Each HDF5 file contains: # /fields/temperature # (N,) float32 - scalar temperature field # /fields/velocity # (N, 3) float32 - 3D velocity vectors # /geometry/coordinates # (N, 3) float32 - spatial coordinates # /metadata/ # Simulation parameters and units
Implement the Data Source
This is a basic data source that reads HDF5 files and extracts the data.
Place this in a file called h5_data_source.py in the tutorial directory.
from datetime import datetime from pathlib import Path from typing import Any, Dict, List import h5py import numpy as np from physicsnemo_curator.etl.data_sources import DataSource from physicsnemo_curator.etl.processing_config import ProcessingConfig class H5DataSource(DataSource): """DataSource for reading HDF5 physics simulation files.""" def __init__(self, cfg: ProcessingConfig, input_dir: str = "tutorial_data"): super().__init__(cfg) self.input_dir = Path(input_dir) # Generate dummy data for tutorial purposes self._generate_dummy_data() def _generate_dummy_data(self): """Generate dummy HDF5 files for the tutorial.""" # Create input directory if it doesn't exist self.input_dir.mkdir(parents=True, exist_ok=True) # Generate 3 simulation runs with different data for run_num in range(1, 4): filename = f"run_{run_num:03d}.h5" filepath = self.input_dir / filename # Skip if file already exists if filepath.exists(): continue # Generate random data num_points = 100 + run_num * 50 # Varying number of points per run # Random 3D coordinates in a unit cube coordinates = np.random.uniform(-1.0, 1.0, size=(num_points, 3)).astype(np.float32) # Temperature field (scalar, range 250-350 K) temperature = np.random.uniform(250.0, 350.0, size=num_points).astype(np.float32) # Velocity field (3D vectors, range -5 to 5 m/s) velocity = np.random.uniform(-5.0, 5.0, size=(num_points, 3)).astype(np.float32) # Create HDF5 file with our schema with h5py.File(filepath, 'w') as f: # Create groups fields_group = f.create_group('fields') geometry_group = f.create_group('geometry') metadata_group = f.create_group('metadata') sim_params_group = metadata_group.create_group('simulation_params') # Store field data fields_group.create_dataset('temperature', data=temperature) fields_group.create_dataset('velocity', data=velocity) # Store geometry data geometry_group.create_dataset('coordinates', data=coordinates) # Store metadata attributes metadata_group.attrs['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') metadata_group.attrs['num_points'] = num_points metadata_group.attrs['temperature_units'] = 'Kelvin' metadata_group.attrs['velocity_units'] = 'm/s' # Store simulation parameters sim_params_group.attrs['total_time'] = np.random.uniform(1.0, 10.0) sim_params_group.attrs['run_number'] = run_num print(f"Generated dummy data in {self.input_dir}") def get_file_list(self) -> List[str]: """Get list of HDF5 files to process.""" h5_files = list(self.input_dir.glob("*.h5")) filenames = [f.stem for f in h5_files] # Remove .h5 extension print(f"Found {len(filenames)} HDF5 files: {filenames}") return sorted(filenames) def read_file(self, filename: str) -> Dict[str, Any]: """Read one HDF5 file and extract data.""" filepath = self.input_dir / f"{filename}.h5" if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") print(f"Reading {filepath}") with h5py.File(filepath, 'r') as f: data = { 'temperature': np.array(f['fields/temperature']), 'velocity': np.array(f['fields/velocity']), 'coordinates': np.array(f['geometry/coordinates']), 'metadata': dict(f['metadata'].attrs), 'filename': filename } print(f"Loaded data with {len(data['temperature'])} points") return data # The following methods aren't used in this tutorial, but are required by the DataSource interface. # They are implemented as no-ops. def write(self, data: dict[str, Any], filename: str) -> None: """Write transformed data to storage.""" pass def should_skip(self, filename: str) -> bool: """Checks whether the file should be skipped.""" pass
Implement Transformations
We’ll create two transformations to demonstrate composability. The first transformation converts the data types and adds compression metadata. The second transformation computes derived fields. These two can be chained together in the pipeline.
Place this in a file called data_transformations.py in the tutorial directory.
from numcodecs import Blosc from typing import Any, Dict import numpy as np from physicsnemo_curator.etl.data_transformations import DataTransformation from physicsnemo_curator.etl.processing_config import ProcessingConfig class DataTypeTransformation(DataTransformation): """First transformation: Convert data types and add compression.""" def __init__(self, cfg: ProcessingConfig): super().__init__(cfg) self.compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.SHUFFLE) def transform(self, data: Dict[str, Any]) -> Dict[str, Any]: """Convert data types and add compression metadata.""" transformed = {} for key in ['temperature', 'velocity', 'coordinates']: if key in data: transformed[key] = { 'data': data[key].astype(np.float32), 'compressor': self.compressor, 'dtype': np.float32 } transformed['metadata'] = data['metadata'] transformed['filename'] = data['filename'] return transformed class DerivedFieldsTransformation(DataTransformation): """Second transformation: Compute derived fields.""" def __init__(self, cfg: ProcessingConfig): super().__init__(cfg) def transform(self, data: Dict[str, Any]) -> Dict[str, Any]: """Add velocity magnitude and temperature statistics.""" if 'velocity' in data: velocity_data = data['velocity']['data'] velocity_magnitude = np.linalg.norm(velocity_data, axis=1) data['velocity_magnitude'] = { 'data': velocity_magnitude.astype(np.float32), 'compressor': data['temperature']['compressor'], 'dtype': np.float32 } # Add statistics to metadata if 'temperature' in data: temp_data = data['temperature']['data'] data['metadata']['temp_min'] = float(np.min(temp_data)) data['metadata']['temp_max'] = float(np.max(temp_data)) data['metadata']['temp_mean'] = float(np.mean(temp_data)) return data
Implement the Data Sink
This is a basic data sink that writes the transformed data to a Zarr store.
Place this in a file called zarr_data_source.py in the tutorial directory.
from pathlib import Path from typing import Any, Dict import zarr from physicsnemo_curator.etl.data_sources import DataSource from physicsnemo_curator.etl.processing_config import ProcessingConfig class ZarrDataSource(DataSource): """DataSource for writing to Zarr stores.""" def __init__(self, cfg: ProcessingConfig, output_dir: str): super().__init__(cfg) self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) def write(self, data: Dict[str, Any], filename: str) -> None: """Write transformed data to a Zarr store.""" store_path = self.output_dir / f"{filename}.zarr" # Create Zarr store store = zarr.DirectoryStore(store_path) root = zarr.group(store=store) # Store metadata as attributes if 'metadata' in data: for key, value in data['metadata'].items(): root.attrs[key] = value data.pop('metadata') # Write all arrays for array_name, array_info in data.items(): if isinstance(array_info, dict) and 'data' in array_info: root.create_dataset( array_name, data=array_info['data'], compressor=array_info['compressor'], dtype=array_info['dtype'] ) # The following methods aren't used in this tutorial, but are required by the DataSource interface. # They are implemented as no-ops. def should_skip(self, filename: str) -> bool: """Checks whether the file should be skipped.""" pass def get_file_list(self) -> list[str]: """Get list of files to process.""" pass def read_file(self, filename: str) -> dict[str, Any]: """Read a single file and return its data.""" pass
Run the Pipeline
Create a configuration file and run the pipeline:
Place this in a file called tutorial_config.yaml in the tutorial directory.
etl: processing: num_processes: 2 # Use 2 processes for this small tutorial dataset args: {} # If you've placed the test scripts in different files, please update the below lines accordingly. source: _target_: h5_data_source.H5DataSource transformations: data_type: _target_: data_transformations.DataTypeTransformation derived_fields: _target_: data_transformations.DerivedFieldsTransformation sink: _target_: zarr_data_source.ZarrDataSource output_dir: output_zarr
Run the pipeline:
- It is important to add the directory containing the test scripts to the
PYTHONPATH
. If you’ve placed the test scripts in a different directory, update the following command accordingly.
export PYTHONPATH=tutorial:$PYTHONPATH physicsnemo-curator-etl --config-dir tutorial --config-name tutorial_config
- It is important to add the directory containing the test scripts to the
Key Features Demonstrated#
Composable Transformations: Multiple transformations can be chained together
Parallel Processing: Built-in support for multi-process workflows
Extensible Architecture: Easy to add custom data sources, transformations, and sinks
Advanced Features#
PhysicsNeMo-Curator includes many advanced features not covered in this tutorial:
Dataset Validation: Comprehensive validation of data structure and content
Custom Data Schemas: Extensible framework for new schemas
Custom Filters: Extensible framework for custom filters
For more information on these advanced features, refer to the complete tutorial and the DoMINO data processing reference.