Source code for physicsnemo.datapipes.cae.domino_datapipe

# SPDX-FileCopyrightText: Copyright (c) 2023 - 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This code provides the datapipe for reading the processed npy files,
generating multi-res grids, calculating signed distance fields,
positional encodings, sampling random points in the volume and on surface,
normalizing fields and returning the output tensors as a dictionary.

This datapipe also non-dimensionalizes the fields, so the order in which the variables should
be fixed: velocity, pressure, turbulent viscosity for volume variables and
pressure, wall-shear-stress for surface variables. The different parameters such as
variable names, domain resolution, sampling size etc. are configurable in config.yaml.
"""

import os
import time
from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext
from dataclasses import dataclass
from pathlib import Path
from typing import Literal, Optional, Protocol, Sequence, Union

import cuml
import cupy as cp
import numpy as np
import torch
import torch.cuda.nvtx as nvtx
import zarr
from omegaconf import DictConfig
from scipy.spatial import KDTree
from torch import Tensor
from torch.utils.data import Dataset, default_collate

from physicsnemo.distributed import DistributedManager
from physicsnemo.utils.domino.utils import (
    ArrayType,
    area_weighted_shuffle_array,
    calculate_center_of_mass,
    calculate_normal_positional_encoding,
    create_grid,
    get_filenames,
    mean_std_sampling,
    normalize,
    pad,
    # sample_array,
    shuffle_array,
    solution_weighted_shuffle_array,
    standardize,
)
from physicsnemo.utils.profiling import profile
from physicsnemo.utils.sdf import signed_distance_field


[docs] def domino_collate_fn(batch): """ This function is a custom collation function to move cupy data to torch tensors on the device. For things that aren't cupy arrays, fall back to torch.data.default_convert. Data, here, is a dictionary of numpy arrays or cupy arrays. """ def convert(obj): if isinstance(obj, cp.ndarray): return torch.utils.dlpack.from_dlpack(obj.toDlpack()) elif isinstance(obj, list): return [convert(x) for x in obj] elif isinstance(obj, tuple): return tuple(convert(x) for x in obj) elif isinstance(obj, dict): return {k: convert(v) for k, v in obj.items()} else: return obj batch = [convert(sample) for sample in batch] return default_collate(batch)
[docs] class BoundingBox(Protocol): """ Type definition for the required format of bounding box dimensions. """ min: ArrayType max: ArrayType
[docs] @dataclass class DoMINODataConfig: """Configuration for DoMINO dataset processing pipeline. Attributes: data_path: Path to the dataset to load. phase: Which phase of data to load ("train", "val", or "test"). surface_variables: (Surface specific) Names of surface variables. surface_points_sample: (Surface specific) Number of surface points to sample per batch. num_surface_neighbors: (Surface specific) Number of surface neighbors to consider for nearest neighbors approach. resample_surfaces: (Surface specific) Whether to resample the surface before kdtree/knn. Not available if caching. resampling_points: (Surface specific) Number of points to resample the surface to. surface_sampling_algorithm: (Surface specific) Algorithm to use for surface sampling ("area_weighted" or "random"). surface_factors: (Surface specific) Non-dimensionalization factors for surface variables. If set, and scaling_type is: - min_max_scaling -> rescale surface_fields to the min/max set here - mean_std_scaling -> rescale surface_fields to the mean and std set here. bounding_box_dims_surf: (Surface specific) Dimensions of bounding box. Must be an object with min/max attributes that are arraylike. volume_variables: (Volume specific) Names of volume variables. volume_points_sample: (Volume specific) Number of volume points to sample per batch. volume_factors: (Volume specific) Non-dimensionalization factors for volume variables scaling. If set, and scaling_type is: - min_max_scaling -> rescale volume_fields to the min/max set here - mean_std_scaling -> rescale volume_fields to the mean and std set here. bounding_box_dims: (Volume specific) Dimensions of bounding box. Must be an object with min/max attributes that are arraylike. grid_resolution: Resolution of the latent grid. normalize_coordinates: Whether to normalize coordinates based on min/max values. For surfaces: uses s_min/s_max, defined from: - Surface bounding box, if defined. - Min/max of the stl_vertices For volumes: uses c_min/c_max, defined from: - Volume bounding_box if defined, - 1.5x s_min/max otherwise, except c_min[2] = s_min[2] in this case sample_in_bbox: Whether to sample points in a specified bounding box. Uses the same min/max points as coordinate normalization. Only performed if compute_scaling_factors is false. sampling: Whether to downsample the full resolution mesh to fit in GPU memory. Surface and volume sampling points are configured separately as: - surface.points_sample - volume.points_sample geom_points_sample: Number of STL points sampled per batch. Independent of volume.points_sample and surface.points_sample. positional_encoding: Whether to use positional encoding. Affects the calculation of: - pos_volume_closest - pos_volume_center_of_mass - pos_surface_centter_of_mass scaling_type: Scaling type for volume variables. If used, will rescale the volume_fields and surface fields outputs. Requires volume.factor and surface.factor to be set. compute_scaling_factors: Whether to compute scaling factors. Not available if caching. Many preprocessing pieces are disabled if computing scaling factors. caching: Whether this is for caching or serving. deterministic: Whether to use a deterministic seed for sampling and random numbers. gpu_preprocessing: Whether to do preprocessing on the GPU (False for CPU). gpu_output: Whether to return output on the GPU as cupy arrays. If False, returns numpy arrays. You might choose gpu_preprocessing=True and gpu_output=False if caching. """ data_path: Path phase: Literal["train", "val", "test"] # Surface-specific variables: surface_variables: Optional[Sequence] = ("pMean", "wallShearStress") surface_points_sample: int = 1024 num_surface_neighbors: int = 11 resample_surfaces: bool = False resampling_points: int = 1_000_000 surface_sampling_algorithm: str = Literal["area_weighted", "random"] surface_factors: Optional[Sequence] = None bounding_box_dims_surf: Optional[Union[BoundingBox, Sequence]] = None # Volume specific variables: volume_variables: Optional[Sequence] = ("UMean", "pMean") volume_points_sample: int = 1024 volume_factors: Optional[Sequence] = None bounding_box_dims: Optional[Union[BoundingBox, Sequence]] = None grid_resolution: Union[Sequence, ArrayType] = (256, 96, 64) normalize_coordinates: bool = False sample_in_bbox: bool = False sampling: bool = False geom_points_sample: int = 300000 positional_encoding: bool = False scaling_type: Optional[Literal["min_max_scaling", "mean_std_scaling"]] = None compute_scaling_factors: bool = False caching: bool = False deterministic: bool = False gpu_preprocessing: bool = True gpu_output: bool = True def __post_init__(self): # Ensure data_path is a Path object: if isinstance(self.data_path, str): self.data_path = Path(self.data_path) self.data_path = self.data_path.expanduser() if not self.data_path.exists(): raise ValueError(f"Path {self.data_path} does not exist") if not self.data_path.is_dir(): raise ValueError(f"Path {self.data_path} is not a directory") # Object if caching settings are impossible: if self.caching: if self.sampling: raise ValueError("Sampling should be False for caching") if self.compute_scaling_factors: raise ValueError("Compute scaling factors should be False for caching") if self.resample_surfaces: raise ValueError("Resample surface should be False for caching") if self.phase not in [ "train", "val", "test", ]: raise ValueError( f"phase should be one of ['train', 'val', 'test'], got {self.phase}" ) if self.scaling_type is not None: if self.scaling_type not in [ "min_max_scaling", "mean_std_scaling", ]: raise ValueError( f"scaling_type should be one of ['min_max_scaling', 'mean_std_scaling'], got {self.scaling_type}" )
##### TODO # - put model type in config or leave in __init__ # - check the bounding box protocol works
[docs] class DoMINODataPipe(Dataset): """ Datapipe for DoMINO """ def __init__( self, input_path, model_type: Literal["surface", "volume", "combined"], **data_config_overrides, ): # Perform config packaging and validation self.config = DoMINODataConfig(data_path=input_path, **data_config_overrides) if not DistributedManager.is_initialized(): DistributedManager.initialize() dist = DistributedManager() if self.config.gpu_preprocessing or self.config.gpu_output: # Make sure we move data to the right device: target_device = dist.device.index self.device_context = cp.cuda.Device(target_device) self.device_context.use() else: self.device_context = nullcontext() self.device = dist.device if self.config.deterministic: np.random.seed(42) cp.random.seed(42) else: np.random.seed(seed=int(time.time())) cp.random.seed(seed=int(time.time())) self.model_type = model_type self.filenames = get_filenames(self.config.data_path, exclude_dirs=True) total_files = len(self.filenames) self.indices = np.array(range(total_files)) # Why shuffle the indices here if only using random access below? np.random.shuffle(self.indices) # Determine the array provider based on what device # will do preprocessing: self.array_provider = cp if self.config.gpu_preprocessing else np # Update the arrays for bounding boxes: if hasattr(self.config.bounding_box_dims, "max") and hasattr( self.config.bounding_box_dims, "min" ): self.config.bounding_box_dims = [ self.array_provider.asarray(self.config.bounding_box_dims.max).astype( "float32" ), self.array_provider.asarray(self.config.bounding_box_dims.min).astype( "float32" ), ] if hasattr(self.config.bounding_box_dims_surf, "max") and hasattr( self.config.bounding_box_dims_surf, "min" ): self.config.bounding_box_dims_surf = [ self.array_provider.asarray( self.config.bounding_box_dims_surf.max ).astype("float32"), self.array_provider.asarray( self.config.bounding_box_dims_surf.min ).astype("float32"), ] # Used if threaded data is enabled: self.max_workers = 24 # Create a single thread pool for the class self.executor = ThreadPoolExecutor(max_workers=self.max_workers) # Define here the keys to read for each __getitem__ call # Always read these keys self.keys_to_read = ["stl_coordinates", "stl_centers", "stl_faces", "stl_areas"] with self.device_context: xp = self.array_provider self.keys_to_read_if_available = { "global_params_values": xp.asarray([30.0, 1.226]), "global_params_reference": xp.asarray([30.0, 1.226]), } self.volume_keys = ["volume_mesh_centers", "volume_fields"] self.surface_keys = [ "surface_mesh_centers", "surface_normals", "surface_areas", "surface_fields", ] if self.model_type == "volume" or self.model_type == "combined": self.keys_to_read.extend(self.volume_keys) if self.model_type == "surface" or self.model_type == "combined": self.keys_to_read.extend(self.surface_keys) def __del__(self): # Clean up the executor when the instance is being destroyed if hasattr(self, "executor"): self.executor.shutdown() @profile def read_data_zarr(self, filepath): # def create_pinned_streaming_space(shape, dtype): # # TODO - this function could boost performance a little, but # # the pinned memory pool seems too small. # if self.array_provider == cp: # nbytes = np.prod(shape) * dtype.itemsize # ptr = cp.cuda.alloc_pinned_memory(nbytes) # arr = np.frombuffer(ptr, dtype) # return arr.reshape(shape) # else: # return np.empty(shape, dtype=dtype) def read_chunk_into_array(ram_array, fs_zarr_array, slice): ram_array[slice] = fs_zarr_array[slice] @profile def chunked_aligned_read(zarr_group, key, futures): zarr_array = zarr_group[key] shape = zarr_array.shape chunk_size = zarr_array.chunks[0] # Pre-allocate the full result array result_shape = zarr_array.shape result_dtype = zarr_array.dtype result = np.empty(result_shape, dtype=result_dtype) for start in range(0, shape[0], chunk_size): end = min(start + chunk_size, shape[0]) read_slice = np.s_[start:end] futures.append( self.executor.submit( read_chunk_into_array, result, zarr_array, read_slice ) ) return result with zarr.open_group(filepath, mode="r") as z: data = {} futures = [] if "volume_fields" in z.keys(): data["volume_fields"] = chunked_aligned_read( z, "volume_fields", futures ) if "volume_mesh_centers" in z.keys(): data["volume_mesh_centers"] = chunked_aligned_read( z, "volume_mesh_centers", futures ) for key in self.keys_to_read: if z[key].shape == (): data[key] = z[key] elif key in ["volume_fields", "volume_mesh_centers"]: continue else: data[key] = np.empty(z[key].shape, dtype=z[key].dtype) slice = np.s_[:] futures.append( self.executor.submit( read_chunk_into_array, data[key], z[key], slice ) ) # Now wait for all the futures to complete for future in futures: result = future.result() if isinstance(result, tuple) and len(result) == 2: key, value = result data[key] = value # Move big data to GPU for key in data.keys(): data[key] = self.array_provider.asarray(data[key]) # Optional, maybe-present keys for key in self.keys_to_read_if_available: if key not in data.keys(): data[key] = self.keys_to_read_if_available[key] return data @profile def read_data_npy(self, filepath): with open(filepath, "rb") as f: data = np.load(f, allow_pickle=True).item() for key in self.keys_to_read_if_available: if key not in data.keys(): data[key] = self.keys_to_read_if_available[key] if "filename" in data.keys(): data.pop("filename", None) if not (isinstance(data["stl_coordinates"], np.ndarray)): data["stl_coordinates"] = np.asarray(data["stl_coordinates"]) # Maybe move to GPU: with self.device_context: for key in data.keys(): if data[key] is not None: data[key] = self.array_provider.asarray(data[key]) return data @profile def read_data_npz( self, filepath, max_workers=None, ): if max_workers is not None: self.max_workers = max_workers def load_one(key): with np.load(filepath) as data: return key, data[key] def check_optional_keys(): with np.load(filepath) as data: optional_results = {} for key in self.keys_to_read_if_available: if key in data.keys(): optional_results[key] = data[key] else: optional_results[key] = self.keys_to_read_if_available[key] with self.device_context: optional_results = { key: self.array_provider.asarray(value) for key, value in optional_results.items() } return optional_results # Use the class-level executor instead of creating a new one results = dict(self.executor.map(load_one, self.keys_to_read)) # Move the results to the GPU: with self.device_context: for key in results.keys(): results[key] = self.array_provider.asarray(results[key]) # Check the optional ones: optional_results = check_optional_keys() results.update(optional_results) return results def __len__(self): return len(self.indices) @profile def preprocess_combined(self, data_dict): # Pull these out and force to fp32: with self.device_context: global_params_values = data_dict["global_params_values"].astype( self.array_provider.float32 ) global_params_reference = data_dict["global_params_reference"].astype( self.array_provider.float32 ) # Pull these pieces out of the data_dict for manipulation stl_vertices = data_dict["stl_coordinates"] stl_centers = data_dict["stl_centers"] mesh_indices_flattened = data_dict["stl_faces"] stl_sizes = data_dict["stl_areas"] idx = np.where(stl_sizes > 0.0) stl_sizes = stl_sizes[idx] stl_centers = stl_centers[idx] xp = self.array_provider # Make sure the mesh_indices_flattened is an integer array: if mesh_indices_flattened.dtype != xp.int32: mesh_indices_flattened = mesh_indices_flattened.astype(xp.int32) length_scale = xp.amax(xp.amax(stl_vertices, 0) - xp.amin(stl_vertices, 0)) center_of_mass = calculate_center_of_mass(stl_centers, stl_sizes) if self.config.bounding_box_dims_surf is None: s_max = xp.amax(stl_vertices, 0) s_min = xp.amin(stl_vertices, 0) else: s_max = xp.asarray(self.config.bounding_box_dims_surf[0]) s_min = xp.asarray(self.config.bounding_box_dims_surf[1]) # SDF calculation on the grid using WARP if not self.config.compute_scaling_factors: nx, ny, nz = self.config.grid_resolution surf_grid = create_grid(s_max, s_min, [nx, ny, nz]) surf_grid_reshaped = surf_grid.reshape(nx * ny * nz, 3) sdf_surf_grid = signed_distance_field( stl_vertices, mesh_indices_flattened, surf_grid_reshaped, use_sign_winding_number=True, ).reshape(nx, ny, nz) else: surf_grid = None sdf_surf_grid = None if self.config.sampling: # nvtx.range_push("Geometry Sampling") geometry_points = self.config.geom_points_sample geometry_coordinates_sampled, idx_geometry = shuffle_array( stl_vertices, geometry_points ) if geometry_coordinates_sampled.shape[0] < geometry_points: geometry_coordinates_sampled = pad( geometry_coordinates_sampled, geometry_points, pad_value=-100.0 ) geom_centers = geometry_coordinates_sampled # nvtx.range_pop() else: geom_centers = stl_vertices # geom_centers = self.array_provider.float32(geom_centers) surf_grid_max_min = xp.stack([s_min, s_max]) return_dict = { "length_scale": length_scale, "surf_grid": surf_grid, "sdf_surf_grid": sdf_surf_grid, "surface_min_max": surf_grid_max_min, "global_params_values": xp.expand_dims( xp.array(global_params_values, dtype=xp.float32), -1 ), "global_params_reference": xp.expand_dims( xp.array(global_params_reference, dtype=xp.float32), -1 ), "geometry_coordinates": geom_centers, } return ( return_dict, s_min, s_max, mesh_indices_flattened, stl_vertices, center_of_mass, ) @profile def preprocess_surface(self, data_dict, core_dict, center_of_mass, s_min, s_max): nx, ny, nz = self.config.grid_resolution return_dict = {} surface_coordinates = data_dict["surface_mesh_centers"] surface_normals = data_dict["surface_normals"] surface_sizes = data_dict["surface_areas"] surface_fields = data_dict["surface_fields"] idx = np.where(surface_sizes > 0) surface_sizes = surface_sizes[idx] surface_fields = surface_fields[idx] surface_normals = surface_normals[idx] surface_coordinates = surface_coordinates[idx] xp = self.array_provider if self.config.resample_surfaces: if self.config.resampling_points > surface_coordinates.shape[0]: resampling_points = surface_coordinates.shape[0] else: resampling_points = self.config.resampling_points surface_coordinates, idx_s = shuffle_array( surface_coordinates, resampling_points ) surface_normals = surface_normals[idx_s] surface_sizes = surface_sizes[idx_s] surface_fields = surface_fields[idx_s] if not self.config.compute_scaling_factors: c_max = self.config.bounding_box_dims[0] c_min = self.config.bounding_box_dims[1] if self.config.sample_in_bbox: # TODO - clean this up with vectorization? # TODO - the xp.where is likely a useless op. Need to check. ids_in_bbox = xp.where( (surface_coordinates[:, 0] > c_min[0]) & (surface_coordinates[:, 0] < c_max[0]) & (surface_coordinates[:, 1] > c_min[1]) & (surface_coordinates[:, 1] < c_max[1]) & (surface_coordinates[:, 2] > c_min[2]) & (surface_coordinates[:, 2] < c_max[2]) ) surface_coordinates = surface_coordinates[ids_in_bbox] surface_normals = surface_normals[ids_in_bbox] surface_sizes = surface_sizes[ids_in_bbox] surface_fields = surface_fields[ids_in_bbox] # Compute the positional encoding before sampling if self.config.positional_encoding: dx, dy, dz = ( (s_max[0] - s_min[0]) / nx, (s_max[1] - s_min[1]) / ny, (s_max[2] - s_min[2]) / nz, ) pos_normals_com_surface = calculate_normal_positional_encoding( surface_coordinates, center_of_mass, cell_length=[dx, dy, dz] ) else: pos_normals_com_surface = surface_coordinates - xp.asarray( center_of_mass ) # Fit the kNN (or KDTree, if CPU) on ALL points: if self.config.num_surface_neighbors > 1: if self.array_provider == cp: knn = cuml.neighbors.NearestNeighbors( n_neighbors=self.config.num_surface_neighbors, algorithm="rbc", ) knn.fit(surface_coordinates) else: # Under the hood this is instantiating a KDTree. # aka here knn is a type, not a class, technically. interp_func = KDTree(surface_coordinates) if self.config.sampling: # Perform the down sampling: if self.config.surface_sampling_algorithm == "area_weighted": ( surface_coordinates_sampled, idx_surface, ) = area_weighted_shuffle_array( surface_coordinates, self.config.surface_points_sample, surface_sizes, ) elif self.config.surface_sampling_algorithm == "solution_weighted": ( surface_coordinates_sampled, idx_surface, ) = solution_weighted_shuffle_array( surface_coordinates, self.config.surface_points_sample, surface_fields[:, 0], scaling_factor=0.5, ) else: surface_coordinates_sampled, idx_surface = shuffle_array( surface_coordinates, self.config.surface_points_sample ) if ( surface_coordinates_sampled.shape[0] < self.config.surface_points_sample ): surface_coordinates_sampled = pad( surface_coordinates_sampled, self.config.surface_points_sample, pad_value=-10.0, ) # Select out the sampled points for non-neighbor arrays: surface_fields = surface_fields[idx_surface] pos_normals_com_surface = pos_normals_com_surface[idx_surface] # Now, perform the kNN on the sampled points: if self.config.num_surface_neighbors > 1: if self.array_provider == cp: ii = knn.kneighbors( surface_coordinates_sampled, return_distance=False ) else: _, ii = interp_func.query( surface_coordinates_sampled, k=self.config.num_surface_neighbors, ) # Pull out the neighbor elements. Note that ii is the index into the original # points - but only exists for the sampled points # In other words, a point from `surface_coordinates_sampled` has neighbors # from the full `surface_coordinates` array. surface_neighbors = surface_coordinates[ii][:, 1:] surface_neighbors_normals = surface_normals[ii][:, 1:] surface_neighbors_sizes = surface_sizes[ii][:, 1:] else: surface_neighbors = surface_coordinates surface_neighbors_normals = surface_normals surface_neighbors_sizes = surface_sizes # We could index into these above the knn step too; they aren't dependent on that. surface_normals = surface_normals[idx_surface] surface_sizes = surface_sizes[idx_surface] # Update the coordinates to the sampled points: surface_coordinates = surface_coordinates_sampled else: # We are *not* sampling, kNN on ALL points: ii = knn.kneighbors(surface_coordinates, return_distance=False) # Construct the neighbors arrays: surface_neighbors = surface_coordinates[ii][:, 1:] surface_neighbors_normals = surface_normals[ii][:, 1:] surface_neighbors_sizes = surface_sizes[ii][:, 1:] # Have to normalize neighbors after the kNN and sampling if self.config.normalize_coordinates: core_dict["surf_grid"] = normalize(core_dict["surf_grid"], s_max, s_min) surface_coordinates = normalize(surface_coordinates, s_max, s_min) surface_neighbors = normalize(surface_neighbors, s_max, s_min) if self.config.scaling_type is not None: if self.config.surface_factors is not None: if self.config.scaling_type == "mean_std_scaling": surf_mean = self.config.surface_factors[0] surf_std = self.config.surface_factors[1] # TODO - Are these array calls needed? surface_fields = standardize( surface_fields, xp.asarray(surf_mean), xp.asarray(surf_std) ) elif self.config.scaling_type == "min_max_scaling": surf_min = self.config.surface_factors[1] surf_max = self.config.surface_factors[0] # TODO - Are these array calls needed? surface_fields = normalize( surface_fields, xp.asarray(surf_max), xp.asarray(surf_min) ) else: surface_sizes = None surface_normals = None surface_neighbors = None surface_neighbors_normals = None surface_neighbors_sizes = None pos_normals_com_surface = None return_dict.update( { "pos_surface_center_of_mass": pos_normals_com_surface, "surface_mesh_centers": surface_coordinates, "surface_mesh_neighbors": surface_neighbors, "surface_normals": surface_normals, "surface_neighbors_normals": surface_neighbors_normals, "surface_areas": surface_sizes, "surface_neighbors_areas": surface_neighbors_sizes, "surface_fields": surface_fields, } ) return return_dict @profile def preprocess_volume( self, data_dict, core_dict, s_min, s_max, mesh_indices_flattened, stl_vertices, center_of_mass, ): return_dict = {} nx, ny, nz = self.config.grid_resolution xp = self.array_provider # # Temporary: convert to cupy here: volume_coordinates = data_dict["volume_mesh_centers"] volume_fields = data_dict["volume_fields"] if not self.config.compute_scaling_factors: if self.config.bounding_box_dims is None: c_max = s_max + (s_max - s_min) / 2 c_min = s_min - (s_max - s_min) / 2 c_min[2] = s_min[2] else: c_max = xp.asarray(self.config.bounding_box_dims[0]) c_min = xp.asarray(self.config.bounding_box_dims[1]) if self.config.sample_in_bbox: # TODO - xp.where can probably be removed. ids_in_bbox = self.array_provider.where( (volume_coordinates[:, 0] > c_min[0]) & (volume_coordinates[:, 0] < c_max[0]) & (volume_coordinates[:, 1] > c_min[1]) & (volume_coordinates[:, 1] < c_max[1]) & (volume_coordinates[:, 2] > c_min[2]) & (volume_coordinates[:, 2] < c_max[2]) ) volume_coordinates = volume_coordinates[ids_in_bbox] volume_fields = volume_fields[ids_in_bbox] dx, dy, dz = ( (c_max[0] - c_min[0]) / nx, (c_max[1] - c_min[1]) / ny, (c_max[2] - c_min[2]) / nz, ) # Generate a grid of specified resolution to map the bounding box # The grid is used for capturing structured geometry features and SDF representation of geometry grid = create_grid(c_max, c_min, [nx, ny, nz]) grid_reshaped = grid.reshape(nx * ny * nz, 3) # SDF calculation on the grid using WARP sdf_grid = signed_distance_field( stl_vertices, mesh_indices_flattened, grid_reshaped, use_sign_winding_number=True, ).reshape((nx, ny, nz)) if self.config.sampling: volume_coordinates_sampled, idx_volume = shuffle_array( volume_coordinates, self.config.volume_points_sample ) if ( volume_coordinates_sampled.shape[0] < self.config.volume_points_sample ): volume_coordinates_sampled = pad( volume_coordinates_sampled, self.config.volume_points_sample, pad_value=-10.0, ) volume_fields = volume_fields[idx_volume] volume_coordinates = volume_coordinates_sampled sdf_nodes, sdf_node_closest_point = signed_distance_field( stl_vertices, mesh_indices_flattened, volume_coordinates, include_hit_points=True, use_sign_winding_number=True, ) # TODO - is this needed? sdf_nodes = xp.asarray(sdf_nodes) sdf_node_closest_point = xp.asarray(sdf_node_closest_point) sdf_nodes = sdf_nodes.reshape((-1, 1)) if self.config.positional_encoding: pos_normals_closest_vol = calculate_normal_positional_encoding( volume_coordinates, sdf_node_closest_point, cell_length=[dx, dy, dz], ) pos_normals_com_vol = calculate_normal_positional_encoding( volume_coordinates, center_of_mass, cell_length=[dx, dy, dz] ) else: pos_normals_closest_vol = volume_coordinates - sdf_node_closest_point pos_normals_com_vol = volume_coordinates - center_of_mass if self.config.normalize_coordinates: volume_coordinates = normalize(volume_coordinates, c_max, c_min) grid = normalize(grid, c_max, c_min) if self.config.scaling_type is not None: if self.config.volume_factors is not None: if self.config.scaling_type == "mean_std_scaling": vol_mean = self.config.volume_factors[0] vol_std = self.config.volume_factors[1] volume_fields = standardize(volume_fields, vol_mean, vol_std) elif self.config.scaling_type == "min_max_scaling": vol_min = xp.asarray(self.config.volume_factors[1]) vol_max = xp.asarray(self.config.volume_factors[0]) volume_fields = normalize(volume_fields, vol_max, vol_min) vol_grid_max_min = xp.stack([c_min, c_max]) else: pos_normals_closest_vol = None pos_normals_com_vol = None sdf_nodes = None sdf_grid = None grid = None vol_grid_max_min = None return_dict.update( { "pos_volume_closest": pos_normals_closest_vol, "pos_volume_center_of_mass": pos_normals_com_vol, "grid": grid, "sdf_grid": sdf_grid, "sdf_nodes": sdf_nodes, "volume_fields": volume_fields, "volume_mesh_centers": volume_coordinates, "volume_min_max": vol_grid_max_min, } ) return return_dict @profile def preprocess_data(self, data_dict): ( return_dict, s_min, s_max, mesh_indices_flattened, stl_vertices, center_of_mass, ) = self.preprocess_combined(data_dict) if self.model_type == "volume" or self.model_type == "combined": volume_dict = self.preprocess_volume( data_dict, return_dict, s_min, s_max, mesh_indices_flattened, stl_vertices, center_of_mass, ) return_dict.update(volume_dict) if self.model_type == "surface" or self.model_type == "combined": surface_dict = self.preprocess_surface( data_dict, return_dict, center_of_mass, s_min, s_max ) return_dict.update(surface_dict) return return_dict @profile def __getitem__(self, idx): """ Function for fetching and processing a single file's data. Domino, in general, expects one example per file and the files are relatively large due to the mesh size. """ if self.config.deterministic: self.array_provider.random.seed(idx) # But also always set numpy: np.random.seed(idx) index = self.indices[idx] cfd_filename = self.filenames[index] # Get all of the data: filepath = self.config.data_path / cfd_filename if filepath.suffix == ".zarr": data_dict = self.read_data_zarr(filepath) elif filepath.suffix == ".npz": data_dict = self.read_data_npz(filepath) elif filepath.suffix == ".npy": data_dict = self.read_data_npy(filepath) else: raise ValueError(f"Unsupported file extension: {filepath.suffix}") return_dict = self.preprocess_data(data_dict) # return only pytorch tensor objects. # If returning on CPU (but processed on GPU), convert below. # This assumes we keep the data on the device it's on. for key, value in return_dict.items(): if isinstance(value, np.ndarray): return_dict[key] = torch.from_numpy(value) elif isinstance(value, cp.ndarray): return_dict[key] = torch.utils.dlpack.from_dlpack(value.toDlpack()) if self.config.gpu_output: # Make sure this is all on the GPU. # Everything here should be a torch tensor now. for key, value in return_dict.items(): if isinstance(value, torch.Tensor) and not value.is_cuda: return_dict[key] = value.pin_memory().to(self.device) else: # Make sure everything is on the CPU. for key, value in return_dict.items(): if isinstance(value, torch.Tensor) and value.is_cuda: return_dict[key] = value.cpu() return return_dict
@profile def compute_scaling_factors(cfg: DictConfig, input_path: str, use_cache: bool) -> None: model_type = cfg.model.model_type max_scaling_factor_files = 20 if model_type == "volume" or model_type == "combined": vol_save_path = os.path.join(cfg.project_dir, "volume_scaling_factors.npy") if not os.path.exists(vol_save_path): print("Computing volume scaling factors") volume_variable_names = list(cfg.variables.volume.solution.keys()) fm_dict = DoMINODataPipe( input_path, phase="train", grid_resolution=cfg.model.interp_res, volume_variables=volume_variable_names, surface_variables=None, normalize_coordinates=True, sampling=False, sample_in_bbox=True, volume_points_sample=cfg.model.volume_points_sample, geom_points_sample=cfg.model.geom_points_sample, positional_encoding=cfg.model.positional_encoding, model_type=cfg.model.model_type, bounding_box_dims=cfg.data.bounding_box, bounding_box_dims_surf=cfg.data.bounding_box_surface, compute_scaling_factors=True, gpu_preprocessing=True, gpu_output=True, ) # Calculate mean if cfg.model.normalization == "mean_std_scaling": for j in range(len(fm_dict)): print("On iteration {j}") d_dict = fm_dict[j] vol_fields = d_dict["volume_fields"] if vol_fields is not None: if j == 0: vol_fields_sum = np.mean(vol_fields, 0) else: vol_fields_sum += np.mean(vol_fields, 0) else: vol_fields_sum = 0.0 vol_fields_mean = vol_fields_sum / len(fm_dict) for j in range(len(fm_dict)): print("On iteration {j} again") d_dict = fm_dict[j] vol_fields = d_dict["volume_fields"] if vol_fields is not None: if j == 0: vol_fields_sum_square = np.mean( (vol_fields - vol_fields_mean) ** 2.0, 0 ) else: vol_fields_sum_square += np.mean( (vol_fields - vol_fields_mean) ** 2.0, 0 ) else: vol_fields_sum_square = 0.0 vol_fields_std = np.sqrt(vol_fields_sum_square / len(fm_dict)) vol_scaling_factors = [vol_fields_mean, vol_fields_std] if cfg.model.normalization == "min_max_scaling": for j in range(len(fm_dict)): print(f"Min max scaling on iteration {j}") d_dict = fm_dict[j] vol_fields = d_dict["volume_fields"] if vol_fields.device.type == "cuda": xp = cp vol_fields = vol_fields.cuda() vol_fields = cp.from_dlpack(vol_fields) else: xp = np vol_fields = vol_fields.cpu().numpy() if vol_fields is not None: vol_mean = xp.mean(vol_fields, 0) vol_std = xp.std(vol_fields, 0) vol_idx = mean_std_sampling( vol_fields, vol_mean, vol_std, tolerance=12.0 ) vol_fields_sampled = xp.delete(vol_fields, vol_idx, axis=0) if j == 0: vol_fields_max = xp.amax(vol_fields_sampled, 0) vol_fields_min = xp.amin(vol_fields_sampled, 0) else: vol_fields_max1 = xp.amax(vol_fields_sampled, 0) vol_fields_min1 = xp.amin(vol_fields_sampled, 0) for k in range(vol_fields.shape[-1]): if vol_fields_max1[k] > vol_fields_max[k]: vol_fields_max[k] = vol_fields_max1[k] if vol_fields_min1[k] < vol_fields_min[k]: vol_fields_min[k] = vol_fields_min1[k] else: vol_fields_max = 0.0 vol_fields_min = 0.0 if j > max_scaling_factor_files: break vol_scaling_factors = [vol_fields_max, vol_fields_min] for i, item in enumerate(vol_scaling_factors): if isinstance(item, cp.ndarray): vol_scaling_factors[i] = item.get() np.save(vol_save_path, vol_scaling_factors) if model_type == "surface" or model_type == "combined": surf_save_path = os.path.join(cfg.project_dir, "surface_scaling_factors.npy") if not os.path.exists(surf_save_path): print("Computing surface scaling factors") volume_variable_names = list(cfg.variables.volume.solution.keys()) surface_variable_names = list(cfg.variables.surface.solution.keys()) fm_dict = DoMINODataPipe( input_path, phase="train", grid_resolution=cfg.model.interp_res, volume_variables=None, surface_variables=surface_variable_names, normalize_coordinates=True, sampling=False, sample_in_bbox=True, volume_points_sample=cfg.model.volume_points_sample, geom_points_sample=cfg.model.geom_points_sample, positional_encoding=cfg.model.positional_encoding, model_type=cfg.model.model_type, bounding_box_dims=cfg.data.bounding_box, bounding_box_dims_surf=cfg.data.bounding_box_surface, compute_scaling_factors=True, ) # Calculate mean if cfg.model.normalization == "mean_std_scaling": for j in range(len(fm_dict)): print(f"Mean std scaling on iteration {j}") d_dict = fm_dict[j] surf_fields = d_dict["surface_fields"].cpu().numpy() if surf_fields is not None: if j == 0: surf_fields_sum = np.mean(surf_fields, 0) else: surf_fields_sum += np.mean(surf_fields, 0) else: surf_fields_sum = 0.0 surf_fields_mean = surf_fields_sum / len(fm_dict) for j in range(len(fm_dict)): print(f"Mean std scaling on iteration {j} again") d_dict = fm_dict[j] surf_fields = d_dict["surface_fields"] if surf_fields is not None: if j == 0: surf_fields_sum_square = np.mean( (surf_fields - surf_fields_mean) ** 2.0, 0 ) else: surf_fields_sum_square += np.mean( (surf_fields - surf_fields_mean) ** 2.0, 0 ) else: surf_fields_sum_square = 0.0 surf_fields_std = np.sqrt(surf_fields_sum_square / len(fm_dict)) surf_scaling_factors = [surf_fields_mean, surf_fields_std] if cfg.model.normalization == "min_max_scaling": for j in range(len(fm_dict)): print(f"Min max scaling on iteration {j}") d_dict = fm_dict[j] surf_fields = d_dict["surface_fields"] if surf_fields.device.type == "cuda": xp = cp surf_fields = surf_fields.cuda() surf_fields = cp.from_dlpack(surf_fields) else: xp = np surf_fields = surf_fields.cpu().numpy() if surf_fields is not None: surf_mean = xp.mean(surf_fields, 0) surf_std = xp.std(surf_fields, 0) surf_idx = mean_std_sampling( surf_fields, surf_mean, surf_std, tolerance=12.0 ) surf_fields_sampled = xp.delete(surf_fields, surf_idx, axis=0) if j == 0: surf_fields_max = xp.amax(surf_fields_sampled, 0) surf_fields_min = xp.amin(surf_fields_sampled, 0) else: surf_fields_max1 = xp.amax(surf_fields_sampled, 0) surf_fields_min1 = xp.amin(surf_fields_sampled, 0) for k in range(surf_fields.shape[-1]): if surf_fields_max1[k] > surf_fields_max[k]: surf_fields_max[k] = surf_fields_max1[k] if surf_fields_min1[k] < surf_fields_min[k]: surf_fields_min[k] = surf_fields_min1[k] else: surf_fields_max = 0.0 surf_fields_min = 0.0 if j > max_scaling_factor_files: break surf_scaling_factors = [surf_fields_max, surf_fields_min] for i, item in enumerate(surf_scaling_factors): if isinstance(item, cp.ndarray): surf_scaling_factors[i] = item.get() np.save(surf_save_path, surf_scaling_factors)
[docs] class CachedDoMINODataset(Dataset): """ Dataset for reading cached DoMINO data files, with optional resampling. Acts as a drop-in replacement for DoMINODataPipe. """ # @nvtx_annotate(message="CachedDoMINODataset __init__") def __init__( self, data_path: Union[str, Path], phase: Literal["train", "val", "test"] = "train", sampling: bool = False, volume_points_sample: Optional[int] = None, surface_points_sample: Optional[int] = None, geom_points_sample: Optional[int] = None, model_type=None, # Model_type, surface, volume or combined deterministic_seed=False, surface_sampling_algorithm="area_weighted", ): super().__init__() self.model_type = model_type if deterministic_seed: np.random.seed(42) if isinstance(data_path, str): data_path = Path(data_path) self.data_path = data_path.expanduser() if not self.data_path.exists(): raise AssertionError(f"Path {self.data_path} does not exist") if not self.data_path.is_dir(): raise AssertionError(f"Path {self.data_path} is not a directory") self.deterministic_seed = deterministic_seed self.sampling = sampling self.volume_points = volume_points_sample self.surface_points = surface_points_sample self.geom_points = geom_points_sample self.surface_sampling_algorithm = surface_sampling_algorithm self.filenames = get_filenames(self.data_path, exclude_dirs=True) total_files = len(self.filenames) self.phase = phase self.indices = np.array(range(total_files)) np.random.shuffle(self.indices) if not self.filenames: raise AssertionError(f"No cached files found in {self.data_path}") def __len__(self): return len(self.indices) # @nvtx_annotate(message="CachedDoMINODataset __getitem__") def __getitem__(self, idx): if self.deterministic_seed: np.random.seed(idx) nvtx.range_push("Load cached file") index = self.indices[idx] cfd_filename = self.filenames[index] filepath = self.data_path / cfd_filename result = np.load(filepath, allow_pickle=True).item() result = { k: v.numpy() if isinstance(v, Tensor) else v for k, v in result.items() } nvtx.range_pop() if not self.sampling: return result nvtx.range_push("Sample points") # Sample volume points if present if "volume_mesh_centers" in result and self.volume_points: coords_sampled, idx_volume = shuffle_array( result["volume_mesh_centers"], self.volume_points ) if coords_sampled.shape[0] < self.volume_points: coords_sampled = pad( coords_sampled, self.volume_points, pad_value=-10.0 ) result["volume_mesh_centers"] = coords_sampled for key in [ "volume_fields", "pos_volume_closest", "pos_volume_center_of_mass", "sdf_nodes", ]: if key in result: result[key] = result[key][idx_volume] # Sample surface points if present if "surface_mesh_centers" in result and self.surface_points: if self.surface_sampling_algorithm == "area_weighted": coords_sampled, idx_surface = area_weighted_shuffle_array( result["surface_mesh_centers"], self.surface_points, result["surface_areas"], ) else: coords_sampled, idx_surface = shuffle_array( result["surface_mesh_centers"], self.surface_points ) if coords_sampled.shape[0] < self.surface_points: coords_sampled = pad( coords_sampled, self.surface_points, pad_value=-10.0 ) ii = result["neighbor_indices"] result["surface_mesh_neighbors"] = result["surface_mesh_centers"][ii] result["surface_neighbors_normals"] = result["surface_normals"][ii] result["surface_neighbors_areas"] = result["surface_areas"][ii] result["surface_mesh_centers"] = coords_sampled for key in [ "surface_fields", "surface_areas", "surface_normals", "pos_surface_center_of_mass", "surface_mesh_neighbors", "surface_neighbors_normals", "surface_neighbors_areas", ]: if key in result: result[key] = result[key][idx_surface] del result["neighbor_indices"] # Sample geometry points if present if "geometry_coordinates" in result and self.geom_points: coords_sampled, _ = shuffle_array( result["geometry_coordinates"], self.geom_points ) if coords_sampled.shape[0] < self.geom_points: coords_sampled = pad(coords_sampled, self.geom_points, pad_value=-100.0) result["geometry_coordinates"] = coords_sampled nvtx.range_pop() return result
def create_domino_dataset( cfg, phase, volume_variable_names, surface_variable_names, vol_factors, surf_factors ): if phase == "train": input_path = cfg.data.input_dir elif phase == "val": input_path = cfg.data.input_dir_val else: raise ValueError(f"Invalid phase {phase}") if cfg.data_processor.use_cache: return CachedDoMINODataset( input_path, phase=phase, sampling=True, volume_points_sample=cfg.model.volume_points_sample, surface_points_sample=cfg.model.surface_points_sample, geom_points_sample=cfg.model.geom_points_sample, model_type=cfg.model.model_type, surface_sampling_algorithm=cfg.model.surface_sampling_algorithm, ) else: overrides = {} if hasattr(cfg.data, "gpu_preprocessing"): overrides["gpu_preprocessing"] = cfg.data.gpu_preprocessing if hasattr(cfg.data, "gpu_output"): overrides["gpu_output"] = cfg.data.gpu_output return DoMINODataPipe( input_path, phase=phase, grid_resolution=cfg.model.interp_res, volume_variables=volume_variable_names, surface_variables=surface_variable_names, normalize_coordinates=True, sampling=True, sample_in_bbox=True, volume_points_sample=cfg.model.volume_points_sample, surface_points_sample=cfg.model.surface_points_sample, geom_points_sample=cfg.model.geom_points_sample, positional_encoding=cfg.model.positional_encoding, volume_factors=vol_factors, surface_factors=surf_factors, scaling_type=cfg.model.normalization, model_type=cfg.model.model_type, bounding_box_dims=cfg.data.bounding_box, bounding_box_dims_surf=cfg.data.bounding_box_surface, num_surface_neighbors=cfg.model.num_neighbors_surface, resample_surfaces=cfg.model.resampling_surface_mesh.resample, resampling_points=cfg.model.resampling_surface_mesh.points, surface_sampling_algorithm=cfg.model.surface_sampling_algorithm, **overrides, ) if __name__ == "__main__": fm_data = DoMINODataPipe( data_path="/code/processed_data/new_models_1/", phase="train", sampling=False, sample_in_bbox=False, )