NVIDIA Modulus Core v0.2.1
Core v0.2.1

deeplearning/modulus/modulus-core-v021/_modules/modulus/datapipes/gnn/ahmed_body_dataset.html

Source code for modulus.datapipes.gnn.ahmed_body_dataset

# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import re
from typing import List, Tuple, Dict, Union, Optional, Any

import numpy as np
import torch
from torch import Tensor
from torch.nn import functional as F
from dataclasses import dataclass

from .utils import read_vtp_file, save_json, load_json
from modulus.datapipes.datapipe import Datapipe
from modulus.datapipes.meta import DatapipeMetaData

try:
    import dgl
    from dgl.data import DGLDataset
except:
    raise ImportError(
        "Ahmed Body Dataset requires the DGL library. Install the "
        + "desired CUDA version at: \n https://www.dgl.ai/pages/start.html"
    )

try:
    import vtk
    import pyvista as pv
except:
    raise ImportError(
        "Ahmed Body Dataset requires the vtk and pyvista libraries. Install with "
        + "pip install vtk pyvista"
    )


[docs]@dataclass class MetaData(DatapipeMetaData): name: str = "AhmedBody" # Optimization auto_device: bool = True cuda_graphs: bool = False # Parallel ddp_sharding: bool = True
[docs]class AhmedBodyDataset(DGLDataset, Datapipe): """ In-memory Ahmed body Dataset Parameters ---------- data_dir: str The directory where the data is stored. split: str, optional The dataset split. Can be 'train', 'validation', or 'test', by default 'train'. num_samples: int, optional The number of samples to use, by default 10. invar_keys: List[str], optional The input node features to consider. Default includes 'pos', 'velocity', 'reynolds_number', 'length', 'width', 'height', 'ground_clearance', 'slant_angle', and 'fillet_radius'. outvar_keys: List[str], optional The output features to consider. Default includes 'p' and 'wallShearStress'. normalize_keys List[str], optional The features to normalize. Default includes 'p', 'wallShearStress', 'velocity', 'length', 'width', 'height', 'ground_clearance', 'slant_angle', and 'fillet_radius'. normalization_bound: Tuple[float, float], optional The lower and upper bounds for normalization. Default is (-1, 1). force_reload: bool, optional If True, forces a reload of the data, by default False. name: str, optional The name of the dataset, by default 'dataset'. verbose: bool, optional If True, enables verbose mode, by default False. compute_drag: bool, optional If True, also returns the coefficient and mesh area and normals that are required for computing the drag coefficient. """ def __init__( self, data_dir: str, split: str = "train", num_samples: int = 10, invar_keys: List[str] = [ "pos", "velocity", "reynolds_number", "length", "width", "height", "ground_clearance", "slant_angle", "fillet_radius", ], outvar_keys: List[str] = ["p", "wallShearStress"], normalize_keys: List[str] = [ "p", "wallShearStress", "velocity", "reynolds_number", "length", "width", "height", "ground_clearance", "slant_angle", "fillet_radius", ], normalization_bound: Tuple[float, float] = (-1.0, 1.0), force_reload: bool = False, name: str = "dataset", verbose: bool = False, compute_drag: bool = False, ): DGLDataset.__init__( self, name=name, force_reload=force_reload, verbose=verbose, ) Datapipe.__init__( self, meta=MetaData(), ) self.split = split self.num_samples = num_samples self.data_dir = os.path.join(data_dir, self.split) self.info_dir = os.path.join(data_dir, self.split + "_info") self.input_keys = invar_keys self.output_keys = outvar_keys self.normalization_bound = normalization_bound self.compute_drag = compute_drag # get the list of all files in the data_dir all_entries = os.listdir(self.data_dir) all_info = os.listdir(self.info_dir) data_list = [ os.path.join(self.data_dir, entry) for entry in all_entries if os.path.isfile(os.path.join(self.data_dir, entry)) ] info_list = [ os.path.join(self.info_dir, entry) for entry in all_info if os.path.isfile(os.path.join(self.info_dir, entry)) ] numbers = [] for directory in data_list: match = re.search(r"\d+", directory) if match: numbers.append(int(match.group())) numbers_info = [] for directory in info_list: match = re.search(r"\d+", directory) if match: numbers_info.append(int(match.group())) numbers = [int(n) for n in numbers] numbers_info = [int(n) for n in numbers_info] # sort the data_list and info_list according to the numbers args = np.argsort(numbers) data_list = [data_list[index] for index in args] numbers = [numbers[index] for index in args] args = np.argsort(numbers_info) info_list = [info_list[index] for index in args] numbers_info = [numbers_info[index] for index in args] assert sorted(numbers) == sorted(numbers_info) self.numbers = numbers # create the graphs and add the node and features self.length = min(len(data_list), self.num_samples) if self.num_samples > self.length: raise ValueError( f"Number of available {self.split} dataset entries " f"({self.length}) is less than the number of samples " f"({self.num_samples})" ) self.graphs = [] if self.compute_drag: self.normals = [] self.areas = [] self.coeff = [] for i in range(self.length): file_path = data_list[i] info_path = info_list[i] polydata = read_vtp_file(file_path) graph = self._create_dgl_graph(polydata, outvar_keys, dtype=torch.int32) ( velocity, reynolds_number, length, width, height, ground_clearance, slant_angle, fillet_radius, ) = self._read_info_file(info_path) if "velocity" in invar_keys: graph.ndata["velocity"] = velocity * torch.ones_like( graph.ndata["pos"][:, [0]] ) if "reynolds_number" in invar_keys: graph.ndata["reynolds_number"] = reynolds_number * torch.ones_like( graph.ndata["pos"][:, [0]] ) if "length" in invar_keys: graph.ndata["length"] = length * torch.ones_like( graph.ndata["pos"][:, [0]] ) if "width" in invar_keys: graph.ndata["width"] = width * torch.ones_like( graph.ndata["pos"][:, [0]] ) if "height" in invar_keys: graph.ndata["height"] = height * torch.ones_like( graph.ndata["pos"][:, [0]] ) if "ground_clearance" in invar_keys: graph.ndata["ground_clearance"] = ground_clearance * torch.ones_like( graph.ndata["pos"][:, [0]] ) if "slant_angle" in invar_keys: graph.ndata["slant_angle"] = slant_angle * torch.ones_like( graph.ndata["pos"][:, [0]] ) if "fillet_radius" in invar_keys: graph.ndata["fillet_radius"] = fillet_radius * torch.ones_like( graph.ndata["pos"][:, [0]] ) if "normals" in invar_keys or self.compute_drag: mesh = pv.read(file_path) mesh.compute_normals( cell_normals=True, point_normals=False, inplace=True ) if "normals" in invar_keys: graph.ndata["normals"] = torch.from_numpy( mesh.cell_data_to_point_data()["Normals"] ) if self.compute_drag: mesh = mesh.compute_cell_sizes() mesh = mesh.cell_data_to_point_data() frontal_area = width * height / 2 * (10 ** (-6)) self.coeff.append(2.0 / ((velocity**2) * frontal_area)) self.normals.append(torch.from_numpy(mesh["Normals"])) self.areas.append(torch.from_numpy(mesh["Area"])) self.graphs.append(graph) # add the edge features self.graphs = self.add_edge_features() # normalize the node and edge features if self.split == "train": self.node_stats = self._get_node_stats(keys=normalize_keys) self.edge_stats = self._get_edge_stats() else: self.node_stats = load_json("node_stats.json") self.edge_stats = load_json("edge_stats.json") self.graphs = self.normalize_node() self.graphs = self.normalize_edge() def __getitem__(self, idx): graph = self.graphs[idx] if self.compute_drag: sid = self.numbers[idx] return graph, sid, self.normals[idx], self.areas[idx], self.coeff[idx] return graph def __len__(self): return self.length
[docs] def add_edge_features(self) -> List[dgl.DGLGraph]: """ Add relative displacement and displacement norm as edge features for each graph in the list of graphs. The calculations are done using the 'pos' attribute in the node data of each graph. The resulting edge features are stored in the 'x' attribute in the edge data of each graph. This method will modify the list of graphs in-place. Returns ------- List[dgl.DGLGraph] The list of graphs with updated edge features. """ if not hasattr(self, "graphs") or not self.graphs: raise ValueError("The list 'graphs' is empty.") for graph in self.graphs: pos = graph.ndata.get("pos") if pos is None: raise ValueError( "'pos' does not exist in the node data of one or more graphs." ) row, col = graph.edges() row = row.long() col = col.long() disp = pos[row] - pos[col] disp_norm = torch.linalg.norm(disp, dim=-1, keepdim=True) graph.edata["x"] = torch.cat((disp, disp_norm), dim=-1) return self.graphs
[docs] def normalize_node(self) -> List[dgl.DGLGraph]: """ Normalize node data in each graph in the list of graphs using min-max normalization. The normalization is performed in-place. The normalization formula used is: normalized_data = 2.0 * normalization_bound[1] * (data - node_min) / (node_max - node_min) + normalization_bound[0] This will bring the node data in each graph into the range of [normalization_bound[0], normalization_bound[1]]. After normalization, node data is concatenated according to the keys defined in 'self.input_keys' and 'self.output_keys', resulting in new node data 'x' and 'y', respectively. Returns ------- List[dgl.DGLGraph] The list of graphs with normalized and concatenated node data. """ if not hasattr(self, "graphs") or not self.graphs: raise ValueError("The list 'graphs' is empty.") if not hasattr(self, "node_stats") or not isinstance(self.node_stats, dict): raise ValueError( "The 'node_stats' attribute does not exist or is not a dictionary." ) invar_keys = set( [ key.replace("_min", "").replace("_max", "") for key in self.node_stats.keys() ] ) for graph in self.graphs: for key in invar_keys: node_min = self.node_stats.get(key + "_min") node_max = self.node_stats.get(key + "_max") if node_min is None or node_max is None: raise ValueError( f"The keys '{key}_min' and/or '{key}_max' do not exist in 'node_stats'." ) if node_max.equal(node_min): raise ValueError( f"The values of '{key}_max' and '{key}_min' are equal, causing a division by zero." ) node_data_key = graph.ndata.get(key) if node_data_key is None: raise ValueError( f"The key '{key}' does not exist in the node data of one or more graphs." ) graph.ndata[key] = ( 2.0 * self.normalization_bound[1] * (node_data_key - node_min) / (node_max - node_min) + self.normalization_bound[0] ) graph.ndata["x"] = torch.cat( [graph.ndata.get(key) for key in self.input_keys], dim=-1, ) graph.ndata["y"] = torch.cat( [graph.ndata.get(key) for key in self.output_keys], dim=-1, ) return self.graphs
[docs] def normalize_edge(self) -> List[dgl.DGLGraph]: """ Normalize edge data 'x' in each graph in the list of graphs using min-max normalization. The normalization is performed in-place. The normalization formula used is: normalized_x = 2.0 * normalization_bound[1] * (x - edge_min) / (edge_max - edge_min) + normalization_bound[0] This will bring the edge data 'x' in each graph into the range of [normalization_bound[0], normalization_bound[1]]. Returns ------- List[dgl.DGLGraph] The list of graphs with normalized edge data 'x'. """ if not hasattr(self, "graphs") or not self.graphs: raise ValueError("The list 'graphs' is empty.") if not hasattr(self, "edge_stats") or not isinstance(self.edge_stats, dict): raise ValueError( "The 'edge_stats' attribute does not exist or is not a dictionary." ) edge_min = self.edge_stats.get("edge_min") edge_max = self.edge_stats.get("edge_max") if edge_min is None or edge_max is None: raise ValueError( "The keys 'edge_min' and/or 'edge_max' do not exist in 'edge_stats'." ) if edge_max.equal(edge_min): raise ValueError( "The values of 'edge_max' and 'edge_min' are equal, causing a division by zero." ) for graph in self.graphs: edge_data_x = graph.edata.get("x") if edge_data_x is None: raise ValueError( "The key 'x' does not exist in the edge data of one or more graphs." ) graph.edata["x"] = ( 2.0 * self.normalization_bound[1] * (edge_data_x - edge_min) / (edge_max - edge_min) + self.normalization_bound[0] ) return self.graphs
[docs] def denormalize(self, pred, gt, device) -> Tuple[Tensor, Tensor]: """ Denormalize the graph node data. Parameters: ----------- pred: Tensor Normalized prediction gt: Tensor Normalized ground truth device: Any The device Returns: -------- Tuple(Tensor, Tensor) Denormalized prediction and ground truth """ stats = self.node_stats stats = {key: val.to(device) for key, val in stats.items()} p_pred = pred p_pred = pred[:, [0]] s_pred = pred[:, 1:] p_gt = gt[:, [0]] s_gt = gt[:, 1:] p_pred = (p_pred - self.normalization_bound[0]) * ( stats["p_max"] - stats["p_min"] ) / (2 * self.normalization_bound[1]) + stats["p_min"] s_pred = (s_pred - self.normalization_bound[0]) * ( stats["wallShearStress_max"] - stats["wallShearStress_min"] ) / (2 * self.normalization_bound[1]) + stats["wallShearStress_min"] p_gt = (p_gt - self.normalization_bound[0]) * ( stats["p_max"] - stats["p_min"] ) / (2 * self.normalization_bound[1]) + stats["p_min"] s_gt = (s_gt - self.normalization_bound[0]) * ( stats["wallShearStress_max"] - stats["wallShearStress_min"] ) / (2 * self.normalization_bound[1]) + stats["wallShearStress_min"] pred = torch.cat((p_pred, s_pred), dim=-1) gt = torch.cat((p_gt, s_gt), dim=-1) return pred, gt

def _get_edge_stats(self) -> Dict[str, Any]: """ Compute and save the minimum and maximum values of each edge attribute 'x' in the graphs. Returns ------- dict A dictionary with keys 'edge_min' and 'edge_max' and the corresponding values being 1-D tensors containing the min or max value for each dimension of the edge attribute 'x'. """ if not self.graphs: raise ValueError("The list 'graphs' is empty.") first_edge_data = self.graphs[0].edata.get("x") if first_edge_data is None: raise ValueError( "The key 'x' does not exist in the edge data of the first graph." ) stats = { "edge_min": torch.full_like(first_edge_data[0, :], float("inf")), "edge_max": torch.full_like(first_edge_data[0, :], float("-inf")), } for i in range(len(self.graphs)): edge_data = self.graphs[i].edata.get("x") if edge_data is None: raise ValueError( f"The key 'x' does not exist in the edge data of the {i}-th graph." ) min_val, _ = edge_data.min(dim=0) max_val, _ = edge_data.max(dim=0) min_val, max_val = min_val.reshape(-1), max_val.reshape(-1) stats["edge_min"] = torch.minimum(stats["edge_min"], min_val) stats["edge_max"] = torch.maximum(stats["edge_max"], max_val) # Save to file save_json(stats, "edge_stats.json") return stats def _get_node_stats(self, keys: List[str]) -> Dict[str, Any]: """ Compute and save the minimum and maximum values of each node attribute for the list of keys in the graphs. Parameters ---------- keys : list of str List of keys for the node attributes. Returns ------- dict A dictionary with each key being a string of format '[key]_min' or '[key]_max' and each value being a 1-D tensor containing the min or max value for each dimension of the node attribute. """ if not self.graphs: raise ValueError("The list 'graphs' is empty.") stats = {} for key in keys: first_node_data = self.graphs[0].ndata.get(key) if first_node_data is None: raise ValueError( f"The key '{key}' does not exist in the node data of the first graph." ) stats[key + "_min"] = torch.full_like(first_node_data[0, :], float("inf")) stats[key + "_max"] = torch.full_like(first_node_data[0, :], float("-inf")) for i in range(len(self.graphs)): for key in keys: node_data = self.graphs[i].ndata.get(key) if node_data is None: raise ValueError( f"The key '{key}' does not exist in the node data of the {i}-th graph." ) min_val, _ = node_data.min(dim=0) max_val, _ = node_data.max(dim=0) min_val, max_val = min_val.reshape(-1), max_val.reshape(-1) stats[key + "_min"] = torch.minimum(stats[key + "_min"], min_val) stats[key + "_max"] = torch.maximum(stats[key + "_max"], max_val) # Save to file save_json(stats, "node_stats.json") return stats @staticmethod def _read_info_file( file_path: str, ) -> Tuple[float, float, float, float, float, float, float, float]: """ Parse the values of specific parameters from a given text file. Parameters ---------- file_path : str Path to the text file. Returns ------- tuple A tuple containing values of velocity, reynolds number, length, width, height, ground clearance, slant angle, and fillet radius. """ # Initialize variables to default value 0.0 velocity = ( reynolds_number ) = ( length ) = width = height = ground_clearance = slant_angle = fillet_radius = 0.0 with open(file_path, "r") as file: for line in file: if "Velocity" in line: velocity = float(line.split(":")[1].strip()) elif "Re" in line: reynolds_number = float(line.split(":")[1].strip()) elif "Length" in line: length = float(line.split(":")[1].strip()) elif "Width" in line: width = float(line.split(":")[1].strip()) elif "Height" in line: height = float(line.split(":")[1].strip()) elif "GroundClearance" in line: ground_clearance = float(line.split(":")[1].strip()) elif "SlantAngle" in line: slant_angle = float(line.split(":")[1].strip()) elif "FilletRadius" in line: fillet_radius = float(line.split(":")[1].strip()) return ( velocity, reynolds_number, length, width, height, ground_clearance, slant_angle, fillet_radius, ) @staticmethod def _create_dgl_graph( polydata: Any, outvar_keys: List[str], to_bidirected: bool = True, add_self_loop: bool = False, dtype: Union[torch.dtype, str] = torch.int32, ) -> dgl.DGLGraph: """ Create a DGL graph from vtkPolyData. Parameters ---------- polydata : vtkPolyData vtkPolyData from which the DGL graph is created. outvar_keys : list of str List of keys for the node attributes to be extracted from the vtkPolyData. to_bidirected : bool, optional Whether to make the graph bidirected. Default is True. add_self_loop : bool, optional Whether to add self-loops in the graph. Default is False. dtype : torch.dtype or str, optional Data type for the graph. Default is torch.int32. Returns ------- dgl.DGLGraph The DGL graph created from the vtkPolyData. """ # Extract point data and connectivity information from the vtkPolyData points = polydata.GetPoints() if points is None: raise ValueError("Failed to get points from the polydata.") vertices = np.array( [points.GetPoint(i) for i in range(points.GetNumberOfPoints())] ) polys = polydata.GetPolys() if polys is None: raise ValueError("Failed to get polygons from the polydata.") polys.InitTraversal() edge_list = [] for i in range(polys.GetNumberOfCells()): id_list = vtk.vtkIdList() polys.GetNextCell(id_list) for j in range(id_list.GetNumberOfIds() - 1): edge_list.append((id_list.GetId(j), id_list.GetId(j + 1))) # Create DGL graph using the connectivity information graph = dgl.graph(edge_list, idtype=dtype) if to_bidirected: graph = dgl.to_bidirected(graph) if add_self_loop: graph = dgl.add_self_loop(graph) # Assign node features using the vertex data graph.ndata["pos"] = torch.tensor(vertices, dtype=torch.float32) # Extract node attributes from the vtkPolyData point_data = polydata.GetPointData() if point_data is None: raise ValueError("Failed to get point data from the polydata.") for i in range(point_data.GetNumberOfArrays()): array = point_data.GetArray(i) array_name = array.GetName() if array_name in outvar_keys: array_data = np.zeros( (points.GetNumberOfPoints(), array.GetNumberOfComponents()) ) for j in range(points.GetNumberOfPoints()): array.GetTuple(j, array_data[j]) # Assign node attributes to the DGL graph graph.ndata[array_name] = torch.tensor(array_data, dtype=torch.float32) return graph

© Copyright 2023, NVIDIA Modulus Team. Last updated on Sep 21, 2023.