Data Curation#

PhysicsNeMo Curator is a sub-module of the PhysicsNeMo framework, a Pythonic library designed to streamline and accelerate the crucial process of data curation at scale for engineering and scientific datasets for training and inference. It accelerates data curation by leveraging GPUs and parallel processing.

This includes customizable interfaces and pipelines for extracting, transforming, and loading data in supported formats and schema. This package is intended to be used as part of the PhysicsNeMo framework.

For more tutorials, refer to PhysicsNeMo Curator Documentation.

Why PhysicsNeMo Curator?#

Scientific data curation is a critical step in the development of AI models, because data is often complex and large, requiring distributed processing, high precision, and storage.

PhysicsNeMo Curator is designed to handle these challenges, providing a flexible and scalable framework for data curation. It contains several key features:

  • Parallel Processing: Built-in support for multi-process and distributed workflows

  • Modular Architecture: Composable data sources, transformations, and sinks

  • Format Flexibility: Support for various scientific data formats (HDF5, VTK, STL, and Zarr)

  • Schema Validation: Built-in dataset validation capabilities

  • Extensible Framework: Easy to extend for custom data formats and transformations

  • Integration: Seamless integration with the broader PhysicsNeMo ecosystem

Core Concepts#

The PhysicsNeMo-Curator ETL pipeline consists of the following main components:

  • DataSource - Handles reading input data and writing output data

  • DataTransformation - Transforms data from one format to another

  • DatasetValidator - Validates input data structure and content (optional)

  • ParallelProcessor - Orchestrates processing files in parallel

The framework is designed for composability. You can mix and match different data sources, transformations, and sinks to create custom pipelines for your specific use case.

Quick Start Tutorial#

This tutorial demonstrates a basic HDF5 to Zarr conversion pipeline. For the complete tutorial with dataset validation, see the PhysicsNeMo Curator Tutorial.

Installation#

The recommended way to use PhysicsNeMo-Curator is through the PhysicsNeMo Docker container.

To run the setup script to install PhysicsNeMo-Curator from source:

# Pull the PhysicsNeMo Docker container
docker pull nvcr.io/nvidia/physicsnemo/physicsnemo:25.08

# Install from source
git clone https://github.com/NVIDIA/physicsnemo-curator.git
cd physicsnemo_curator
pip install -e "."

Basic HDF5 to Zarr Pipeline#

Let’s create a basic pipeline that converts HDF5 files to Zarr format. This example demonstrates the core concepts without the complexity of full dataset validation.

  1. Create a directory named tutorial and use it for all the test scripts.

  2. Define the Data Schema

    For this tutorial, we’ll work with a basic physics simulation dataset:

    # Each HDF5 file contains:
    # /fields/temperature     # (N,) float32 - scalar temperature field
    # /fields/velocity        # (N, 3) float32 - 3D velocity vectors
    # /geometry/coordinates   # (N, 3) float32 - spatial coordinates
    # /metadata/              # Simulation parameters and units
    
  3. Implement the Data Source

    This is a basic data source that reads HDF5 files and extracts the data.

    Place this in a file called h5_data_source.py in the tutorial directory.

    from datetime import datetime
    from pathlib import Path
    from typing import Any, Dict, List
    
    import h5py
    import numpy as np
    
    from physicsnemo_curator.etl.data_sources import DataSource
    from physicsnemo_curator.etl.processing_config import ProcessingConfig
    
    class H5DataSource(DataSource):
        """DataSource for reading HDF5 physics simulation files."""
        
        def __init__(self, cfg: ProcessingConfig, input_dir: str = "tutorial_data"):
            super().__init__(cfg)
            self.input_dir = Path(input_dir)
            
            # Generate dummy data for tutorial purposes
            self._generate_dummy_data()
            
        def _generate_dummy_data(self):
            """Generate dummy HDF5 files for the tutorial."""
            
            # Create input directory if it doesn't exist
            self.input_dir.mkdir(parents=True, exist_ok=True)
            
            # Generate 3 simulation runs with different data
            for run_num in range(1, 4):
                filename = f"run_{run_num:03d}.h5"
                filepath = self.input_dir / filename
                
                # Skip if file already exists
                if filepath.exists():
                    continue
                    
                # Generate random data
                num_points = 100 + run_num * 50  # Varying number of points per run
                
                # Random 3D coordinates in a unit cube
                coordinates = np.random.uniform(-1.0, 1.0, size=(num_points, 3)).astype(np.float32)
                
                # Temperature field (scalar, range 250-350 K)
                temperature = np.random.uniform(250.0, 350.0, size=num_points).astype(np.float32)
                
                # Velocity field (3D vectors, range -5 to 5 m/s)
                velocity = np.random.uniform(-5.0, 5.0, size=(num_points, 3)).astype(np.float32)
                
                # Create HDF5 file with our schema
                with h5py.File(filepath, 'w') as f:
                    # Create groups
                    fields_group = f.create_group('fields')
                    geometry_group = f.create_group('geometry')
                    metadata_group = f.create_group('metadata')
                    sim_params_group = metadata_group.create_group('simulation_params')
                    
                    # Store field data
                    fields_group.create_dataset('temperature', data=temperature)
                    fields_group.create_dataset('velocity', data=velocity)
                    
                    # Store geometry data
                    geometry_group.create_dataset('coordinates', data=coordinates)
                    
                    # Store metadata attributes
                    metadata_group.attrs['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    metadata_group.attrs['num_points'] = num_points
                    metadata_group.attrs['temperature_units'] = 'Kelvin'
                    metadata_group.attrs['velocity_units'] = 'm/s'
                    
                    # Store simulation parameters
                    sim_params_group.attrs['total_time'] = np.random.uniform(1.0, 10.0)
                    sim_params_group.attrs['run_number'] = run_num
            
            print(f"Generated dummy data in {self.input_dir}")
            
        def get_file_list(self) -> List[str]:
            """Get list of HDF5 files to process."""
            h5_files = list(self.input_dir.glob("*.h5"))
            filenames = [f.stem for f in h5_files]  # Remove .h5 extension
            print(f"Found {len(filenames)} HDF5 files: {filenames}")
            return sorted(filenames)
            
        def read_file(self, filename: str) -> Dict[str, Any]:
            """Read one HDF5 file and extract data."""
            filepath = self.input_dir / f"{filename}.h5"
            
            if not filepath.exists():
                raise FileNotFoundError(f"File not found: {filepath}")
            
            print(f"Reading {filepath}")
            
            with h5py.File(filepath, 'r') as f:
                data = {
                    'temperature': np.array(f['fields/temperature']),
                    'velocity': np.array(f['fields/velocity']),
                    'coordinates': np.array(f['geometry/coordinates']),
                    'metadata': dict(f['metadata'].attrs),
                    'filename': filename
                }
            
            print(f"Loaded data with {len(data['temperature'])} points")
            return data
    
        # The following methods aren't used in this tutorial, but are required by the DataSource interface.
        # They are implemented as no-ops.
        def write(self, data: dict[str, Any], filename: str) -> None:
            """Write transformed data to storage."""
            pass
    
        def should_skip(self, filename: str) -> bool:
            """Checks whether the file should be skipped."""
            pass
    
  4. Implement Transformations

    We’ll create two transformations to demonstrate composability. The first transformation converts the data types and adds compression metadata. The second transformation computes derived fields. These two can be chained together in the pipeline.

    Place this in a file called data_transformations.py in the tutorial directory.

    from numcodecs import Blosc
    from typing import Any, Dict
    
    import numpy as np
    
    from physicsnemo_curator.etl.data_transformations import DataTransformation
    from physicsnemo_curator.etl.processing_config import ProcessingConfig
    
    class DataTypeTransformation(DataTransformation):
        """First transformation: Convert data types and add compression."""
        
        def __init__(self, cfg: ProcessingConfig):
            super().__init__(cfg)
            self.compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.SHUFFLE)
            
        def transform(self, data: Dict[str, Any]) -> Dict[str, Any]:
            """Convert data types and add compression metadata."""
            transformed = {}
            
            for key in ['temperature', 'velocity', 'coordinates']:
                if key in data:
                    transformed[key] = {
                        'data': data[key].astype(np.float32),
                        'compressor': self.compressor,
                        'dtype': np.float32
                    }
            
            transformed['metadata'] = data['metadata']
            transformed['filename'] = data['filename']
            return transformed
    
    class DerivedFieldsTransformation(DataTransformation):
        """Second transformation: Compute derived fields."""
        
        def __init__(self, cfg: ProcessingConfig):
            super().__init__(cfg)
    
        def transform(self, data: Dict[str, Any]) -> Dict[str, Any]:
            """Add velocity magnitude and temperature statistics."""
            if 'velocity' in data:
                velocity_data = data['velocity']['data']
                velocity_magnitude = np.linalg.norm(velocity_data, axis=1)
                
                data['velocity_magnitude'] = {
                    'data': velocity_magnitude.astype(np.float32),
                    'compressor': data['temperature']['compressor'],
                    'dtype': np.float32
                }
            
            # Add statistics to metadata
            if 'temperature' in data:
                temp_data = data['temperature']['data']
                data['metadata']['temp_min'] = float(np.min(temp_data))
                data['metadata']['temp_max'] = float(np.max(temp_data))
                data['metadata']['temp_mean'] = float(np.mean(temp_data))
    
            return data
    
  5. Implement the Data Sink

    This is a basic data sink that writes the transformed data to a Zarr store.

    Place this in a file called zarr_data_source.py in the tutorial directory.

    from pathlib import Path
    from typing import Any, Dict
    
    import zarr
    
    from physicsnemo_curator.etl.data_sources import DataSource
    from physicsnemo_curator.etl.processing_config import ProcessingConfig
    
    class ZarrDataSource(DataSource):
        """DataSource for writing to Zarr stores."""
        
        def __init__(self, cfg: ProcessingConfig, output_dir: str):
            super().__init__(cfg)
            self.output_dir = Path(output_dir)
            self.output_dir.mkdir(parents=True, exist_ok=True)
            
        def write(self, data: Dict[str, Any], filename: str) -> None:
            """Write transformed data to a Zarr store."""
            store_path = self.output_dir / f"{filename}.zarr"
            
            # Create Zarr store
            store = zarr.DirectoryStore(store_path)
            root = zarr.group(store=store)
            
            # Store metadata as attributes
            if 'metadata' in data:
                for key, value in data['metadata'].items():
                    root.attrs[key] = value
                data.pop('metadata')
            
            # Write all arrays
            for array_name, array_info in data.items():
                if isinstance(array_info, dict) and 'data' in array_info:
                    root.create_dataset(
                        array_name,
                        data=array_info['data'],
                        compressor=array_info['compressor'],
                        dtype=array_info['dtype']
                    )
    
        # The following methods aren't used in this tutorial, but are required by the DataSource interface.
        # They are implemented as no-ops.
        def should_skip(self, filename: str) -> bool:
            """Checks whether the file should be skipped."""
            pass
    
        def get_file_list(self) -> list[str]:
            """Get list of files to process."""
            pass
    
        def read_file(self, filename: str) -> dict[str, Any]:
            """Read a single file and return its data."""
            pass
    
  6. Run the Pipeline

    Create a configuration file and run the pipeline:

    Place this in a file called tutorial_config.yaml in the tutorial directory.

    etl:
        processing:
            num_processes: 2  # Use 2 processes for this small tutorial dataset
            args: {}
        
        # If you've placed the test scripts in different files, please update the below lines accordingly.
        source:
            _target_: h5_data_source.H5DataSource
        
        transformations:
            data_type:
                _target_: data_transformations.DataTypeTransformation
            derived_fields:
                _target_: data_transformations.DerivedFieldsTransformation
            
        sink:
            _target_: zarr_data_source.ZarrDataSource
            output_dir: output_zarr
    

    Run the pipeline:

    It is important to add the directory containing the test scripts to the PYTHONPATH.

    If you’ve placed the test scripts in a different directory, update the following command accordingly.

    export PYTHONPATH=tutorial:$PYTHONPATH
    physicsnemo-curator-etl --config-dir tutorial --config-name tutorial_config
    

Key Features Demonstrated#

  • Composable Transformations: Multiple transformations can be chained together

  • Parallel Processing: Built-in support for multi-process workflows

  • Extensible Architecture: Easy to add custom data sources, transformations, and sinks

Advanced Features#

PhysicsNeMo-Curator includes many advanced features not covered in this tutorial:

  • Dataset Validation: Comprehensive validation of data structure and content

  • Custom Data Schemas: Extensible framework for new schemas

  • Custom Filters: Extensible framework for custom filters

For more information on these advanced features, refer to the complete tutorial and the DoMINO data processing reference.