Source code for physicsnemo.models.domino.model

# SPDX-FileCopyrightText: Copyright (c) 2023 - 2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This code contains the DoMINO model architecture.
The DoMINO class contains an architecture to model both surface and
volume quantities together as well as separately (controlled using
the config.yaml file)
"""

import torch
import torch.nn as nn

from physicsnemo.models.layers import FourierMLP, get_activation
from physicsnemo.models.unet import UNet

from .encodings import (
    MultiGeometryEncoding,
)
from .geometry_rep import GeometryRep, scale_sdf
from .mlps import AggregationModel
from .solutions import SolutionCalculatorSurface, SolutionCalculatorVolume

# @dataclass
# class MetaData(ModelMetaData):
#     name: str = "DoMINO"
#     # Optimization
#     jit: bool = False
#     cuda_graphs: bool = True
#     amp: bool = True
#     # Inference
#     onnx_cpu: bool = True
#     onnx_gpu: bool = True
#     onnx_runtime: bool = True
#     # Physics informed
#     var_dim: int = 1
#     func_torch: bool = False
#     auto_grad: bool = False


[docs] class DoMINO(nn.Module): """ DoMINO model architecture for predicting both surface and volume quantities. The DoMINO (Deep Operational Modal Identification and Nonlinear Optimization) model is designed to model both surface and volume physical quantities in aerodynamic simulations. It can operate in three modes: 1. Surface-only: Predicting only surface quantities 2. Volume-only: Predicting only volume quantities 3. Combined: Predicting both surface and volume quantities The model uses a combination of: - Geometry representation modules - Neural network basis functions - Parameter encoding - Local and global geometry processing - Aggregation models for final prediction Parameters ---------- input_features : int Number of point input features output_features_vol : int, optional Number of output features in volume output_features_surf : int, optional Number of output features on surface model_parameters Model parameters controlled by config.yaml Example ------- >>> from physicsnemo.models.domino.model import DoMINO >>> import torch, os >>> from hydra import compose, initialize >>> from omegaconf import OmegaConf >>> device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") >>> cfg = OmegaConf.register_new_resolver("eval", eval) >>> with initialize(version_base="1.3", config_path="examples/cfd/external_aerodynamics/domino/src/conf"): ... cfg = compose(config_name="config") >>> cfg.model.model_type = "combined" >>> model = DoMINO( ... input_features=3, ... output_features_vol=5, ... output_features_surf=4, ... model_parameters=cfg.model ... ).to(device) Warp ... >>> bsize = 1 >>> nx, ny, nz = cfg.model.interp_res >>> num_neigh = 7 >>> global_features = 2 >>> pos_normals_closest_vol = torch.randn(bsize, 100, 3).to(device) >>> pos_normals_com_vol = torch.randn(bsize, 100, 3).to(device) >>> pos_normals_com_surface = torch.randn(bsize, 100, 3).to(device) >>> geom_centers = torch.randn(bsize, 100, 3).to(device) >>> grid = torch.randn(bsize, nx, ny, nz, 3).to(device) >>> surf_grid = torch.randn(bsize, nx, ny, nz, 3).to(device) >>> sdf_grid = torch.randn(bsize, nx, ny, nz).to(device) >>> sdf_surf_grid = torch.randn(bsize, nx, ny, nz).to(device) >>> sdf_nodes = torch.randn(bsize, 100, 1).to(device) >>> surface_coordinates = torch.randn(bsize, 100, 3).to(device) >>> surface_neighbors = torch.randn(bsize, 100, num_neigh, 3).to(device) >>> surface_normals = torch.randn(bsize, 100, 3).to(device) >>> surface_neighbors_normals = torch.randn(bsize, 100, num_neigh, 3).to(device) >>> surface_sizes = torch.rand(bsize, 100).to(device) + 1e-6 # Note this needs to be > 0.0 >>> surface_neighbors_areas = torch.rand(bsize, 100, num_neigh).to(device) + 1e-6 >>> volume_coordinates = torch.randn(bsize, 100, 3).to(device) >>> vol_grid_max_min = torch.randn(bsize, 2, 3).to(device) >>> surf_grid_max_min = torch.randn(bsize, 2, 3).to(device) >>> global_params_values = torch.randn(bsize, global_features, 1).to(device) >>> global_params_reference = torch.randn(bsize, global_features, 1).to(device) >>> input_dict = { ... "pos_volume_closest": pos_normals_closest_vol, ... "pos_volume_center_of_mass": pos_normals_com_vol, ... "pos_surface_center_of_mass": pos_normals_com_surface, ... "geometry_coordinates": geom_centers, ... "grid": grid, ... "surf_grid": surf_grid, ... "sdf_grid": sdf_grid, ... "sdf_surf_grid": sdf_surf_grid, ... "sdf_nodes": sdf_nodes, ... "surface_mesh_centers": surface_coordinates, ... "surface_mesh_neighbors": surface_neighbors, ... "surface_normals": surface_normals, ... "surface_neighbors_normals": surface_neighbors_normals, ... "surface_areas": surface_sizes, ... "surface_neighbors_areas": surface_neighbors_areas, ... "volume_mesh_centers": volume_coordinates, ... "volume_min_max": vol_grid_max_min, ... "surface_min_max": surf_grid_max_min, ... "global_params_reference": global_params_values, ... "global_params_values": global_params_reference, ... } >>> output = model(input_dict) >>> print(f"{output[0].shape}, {output[1].shape}") torch.Size([1, 100, 5]), torch.Size([1, 100, 4]) """ def __init__( self, input_features: int, output_features_vol: int | None = None, output_features_surf: int | None = None, global_features: int = 2, model_parameters=None, ): """ Initialize the DoMINO model. Args: input_features: Number of input feature dimensions for point data output_features_vol: Number of output features for volume quantities (None for surface-only mode) output_features_surf: Number of output features for surface quantities (None for volume-only mode) model_parameters: Configuration parameters for the model Raises: ValueError: If both output_features_vol and output_features_surf are None """ super().__init__() self.output_features_vol = output_features_vol self.output_features_surf = output_features_surf self.num_sample_points_surface = model_parameters.num_neighbors_surface self.num_sample_points_volume = model_parameters.num_neighbors_volume self.combined_vol_surf = model_parameters.combine_volume_surface self.activation_processor = ( model_parameters.geometry_rep.geo_processor.activation ) if self.combined_vol_surf: h = 8 in_channels = ( 2 + len(model_parameters.geometry_rep.geo_conv.volume_radii) + len(model_parameters.geometry_rep.geo_conv.surface_radii) ) out_channels_surf = 1 + len( model_parameters.geometry_rep.geo_conv.surface_radii ) out_channels_vol = 1 + len( model_parameters.geometry_rep.geo_conv.volume_radii ) self.combined_unet_surf = UNet( in_channels=in_channels, out_channels=out_channels_surf, model_depth=3, feature_map_channels=[ h, 2 * h, 4 * h, ], num_conv_blocks=1, kernel_size=3, stride=1, conv_activation=self.activation_processor, padding=1, padding_mode="zeros", pooling_type="MaxPool3d", pool_size=2, normalization="layernorm", use_attn_gate=True, attn_decoder_feature_maps=[4 * h, 2 * h], attn_feature_map_channels=[2 * h, h], attn_intermediate_channels=4 * h, gradient_checkpointing=True, ) self.combined_unet_vol = UNet( in_channels=in_channels, out_channels=out_channels_vol, model_depth=3, feature_map_channels=[ h, 2 * h, 4 * h, ], num_conv_blocks=1, kernel_size=3, stride=1, conv_activation=self.activation_processor, padding=1, padding_mode="zeros", pooling_type="MaxPool3d", pool_size=2, normalization="layernorm", use_attn_gate=True, attn_decoder_feature_maps=[4 * h, 2 * h], attn_feature_map_channels=[2 * h, h], attn_intermediate_channels=4 * h, gradient_checkpointing=True, ) self.global_features = global_features if self.output_features_vol is None and self.output_features_surf is None: raise ValueError( "At least one of `output_features_vol` or `output_features_surf` must be specified" ) if hasattr(model_parameters, "solution_calculation_mode"): if model_parameters.solution_calculation_mode not in [ "one-loop", "two-loop", ]: raise ValueError( f"Invalid solution_calculation_mode: {model_parameters.solution_calculation_mode}, select 'one-loop' or 'two-loop'." ) self.solution_calculation_mode = model_parameters.solution_calculation_mode else: self.solution_calculation_mode = "two-loop" self.num_variables_vol = output_features_vol self.num_variables_surf = output_features_surf self.grid_resolution = model_parameters.interp_res self.use_surface_normals = model_parameters.use_surface_normals self.use_surface_area = model_parameters.use_surface_area self.encode_parameters = model_parameters.encode_parameters self.geo_encoding_type = model_parameters.geometry_encoding_type if self.use_surface_normals: if not self.use_surface_area: input_features_surface = input_features + 3 else: input_features_surface = input_features + 4 else: input_features_surface = input_features if self.encode_parameters: # Defining the parameter model base_layer_p = model_parameters.parameter_model.base_layer self.parameter_model = FourierMLP( input_features=self.global_features, fourier_features=model_parameters.parameter_model.fourier_features, num_modes=model_parameters.parameter_model.num_modes, base_layer=model_parameters.parameter_model.base_layer, activation=get_activation(model_parameters.parameter_model.activation), ) else: base_layer_p = 0 self.geo_rep_volume = GeometryRep( input_features=input_features, radii=model_parameters.geometry_rep.geo_conv.volume_radii, neighbors_in_radius=model_parameters.geometry_rep.geo_conv.volume_neighbors_in_radius, hops=model_parameters.geometry_rep.geo_conv.volume_hops, sdf_scaling_factor=model_parameters.geometry_rep.geo_processor.volume_sdf_scaling_factor, model_parameters=model_parameters, ) self.geo_rep_surface = GeometryRep( input_features=input_features, radii=model_parameters.geometry_rep.geo_conv.surface_radii, neighbors_in_radius=model_parameters.geometry_rep.geo_conv.surface_neighbors_in_radius, hops=model_parameters.geometry_rep.geo_conv.surface_hops, sdf_scaling_factor=model_parameters.geometry_rep.geo_processor.surface_sdf_scaling_factor, model_parameters=model_parameters, ) # Basis functions for surface and volume base_layer_nn = model_parameters.nn_basis_functions.base_layer if self.output_features_surf is not None: self.nn_basis_surf = nn.ModuleList() for _ in range( self.num_variables_surf ): # Have the same basis function for each variable self.nn_basis_surf.append( FourierMLP( input_features=input_features_surface, base_layer=model_parameters.nn_basis_functions.base_layer, fourier_features=model_parameters.nn_basis_functions.fourier_features, num_modes=model_parameters.nn_basis_functions.num_modes, activation=get_activation( model_parameters.nn_basis_functions.activation ), # model_parameters=model_parameters.nn_basis_functions, ) ) if self.output_features_vol is not None: self.nn_basis_vol = nn.ModuleList() for _ in range( self.num_variables_vol ): # Have the same basis function for each variable self.nn_basis_vol.append( FourierMLP( input_features=input_features, base_layer=model_parameters.nn_basis_functions.base_layer, fourier_features=model_parameters.nn_basis_functions.fourier_features, num_modes=model_parameters.nn_basis_functions.num_modes, activation=get_activation( model_parameters.nn_basis_functions.activation ), # model_parameters=model_parameters.nn_basis_functions, ) ) # Positional encoding position_encoder_base_neurons = model_parameters.position_encoder.base_neurons self.activation = get_activation(model_parameters.activation) self.use_sdf_in_basis_func = model_parameters.use_sdf_in_basis_func self.sdf_scaling_factor = ( model_parameters.geometry_rep.geo_processor.volume_sdf_scaling_factor ) if self.output_features_vol is not None: inp_pos_vol = ( 7 + len(self.sdf_scaling_factor) if model_parameters.use_sdf_in_basis_func else 3 ) self.fc_p_vol = FourierMLP( input_features=inp_pos_vol, fourier_features=model_parameters.position_encoder.fourier_features, num_modes=model_parameters.position_encoder.num_modes, base_layer=model_parameters.position_encoder.base_neurons, activation=get_activation(model_parameters.position_encoder.activation), ) if self.output_features_surf is not None: inp_pos_surf = 3 self.fc_p_surf = FourierMLP( input_features=inp_pos_surf, fourier_features=model_parameters.position_encoder.fourier_features, num_modes=model_parameters.position_encoder.num_modes, base_layer=model_parameters.position_encoder.base_neurons, activation=get_activation(model_parameters.position_encoder.activation), ) # Create a set of local geometry encodings for the surface data: self.surface_local_geo_encodings = MultiGeometryEncoding( radii=model_parameters.geometry_local.surface_radii, neighbors_in_radius=model_parameters.geometry_local.surface_neighbors_in_radius, geo_encoding_type=self.geo_encoding_type, n_upstream_radii=len(model_parameters.geometry_rep.geo_conv.surface_radii), base_layer=512, activation=get_activation(model_parameters.local_point_conv.activation), grid_resolution=self.grid_resolution, ) # Create a set of local geometry encodings for the surface data: self.volume_local_geo_encodings = MultiGeometryEncoding( radii=model_parameters.geometry_local.volume_radii, neighbors_in_radius=model_parameters.geometry_local.volume_neighbors_in_radius, geo_encoding_type=self.geo_encoding_type, n_upstream_radii=len(model_parameters.geometry_rep.geo_conv.volume_radii), base_layer=512, activation=get_activation(model_parameters.local_point_conv.activation), grid_resolution=self.grid_resolution, ) # Aggregation model if self.output_features_surf is not None: # Surface base_layer_geo_surf = 0 for j in model_parameters.geometry_local.surface_neighbors_in_radius: base_layer_geo_surf += j self.agg_model_surf = nn.ModuleList() for _ in range(self.num_variables_surf): self.agg_model_surf.append( AggregationModel( input_features=position_encoder_base_neurons + base_layer_nn + base_layer_geo_surf + base_layer_p, output_features=1, base_layer=model_parameters.aggregation_model.base_layer, activation=get_activation( model_parameters.aggregation_model.activation ), ) ) self.solution_calculator_surf = SolutionCalculatorSurface( num_variables=self.num_variables_surf, num_sample_points=self.num_sample_points_surface, use_surface_normals=self.use_surface_normals, use_surface_area=self.use_surface_area, encode_parameters=self.encode_parameters, parameter_model=self.parameter_model if self.encode_parameters else None, aggregation_model=self.agg_model_surf, nn_basis=self.nn_basis_surf, ) if self.output_features_vol is not None: # Volume base_layer_geo_vol = 0 for j in model_parameters.geometry_local.volume_neighbors_in_radius: base_layer_geo_vol += j self.agg_model_vol = nn.ModuleList() for _ in range(self.num_variables_vol): self.agg_model_vol.append( AggregationModel( input_features=position_encoder_base_neurons + base_layer_nn + base_layer_geo_vol + base_layer_p, output_features=1, base_layer=model_parameters.aggregation_model.base_layer, activation=get_activation( model_parameters.aggregation_model.activation ), ) ) if hasattr(model_parameters, "return_volume_neighbors"): return_volume_neighbors = model_parameters.return_volume_neighbors else: return_volume_neighbors = False self.solution_calculator_vol = SolutionCalculatorVolume( num_variables=self.num_variables_vol, num_sample_points=self.num_sample_points_volume, noise_intensity=50, return_volume_neighbors=return_volume_neighbors, encode_parameters=self.encode_parameters, parameter_model=self.parameter_model if self.encode_parameters else None, aggregation_model=self.agg_model_vol, nn_basis=self.nn_basis_vol, ) def forward(self, data_dict): # Loading STL inputs, bounding box grids, precomputed SDF and scaling factors # STL nodes geo_centers = data_dict["geometry_coordinates"] # Bounding box grid s_grid = data_dict["surf_grid"] sdf_surf_grid = data_dict["sdf_surf_grid"] # Parameters global_params_values = data_dict["global_params_values"] global_params_reference = data_dict["global_params_reference"] if self.output_features_vol is not None: # Represent geometry on computational grid # Computational domain grid p_grid = data_dict["grid"] sdf_grid = data_dict["sdf_grid"] # Scaling factors if "volume_min_max" in data_dict.keys(): vol_max = data_dict["volume_min_max"][:, 1] vol_min = data_dict["volume_min_max"][:, 0] # Normalize based on computational domain geo_centers_vol = ( 2.0 * (geo_centers - vol_min) / (vol_max - vol_min) - 1 ) else: geo_centers_vol = geo_centers encoding_g_vol = self.geo_rep_volume(geo_centers_vol, p_grid, sdf_grid) # SDF on volume mesh nodes sdf_nodes = data_dict["sdf_nodes"] # scaled_sdf_nodes = [] # for i in range(len(self.sdf_scaling_factor)): # scaled_sdf_nodes.append(scale_sdf(sdf_nodes, self.sdf_scaling_factor[i])) scaled_sdf_nodes = [ scale_sdf(sdf_nodes, scaling) for scaling in self.sdf_scaling_factor ] scaled_sdf_nodes = torch.cat(scaled_sdf_nodes, dim=-1) # Positional encoding based on closest point on surface to a volume node pos_volume_closest = data_dict["pos_volume_closest"] # Positional encoding based on center of mass of geometry to volume node pos_volume_center_of_mass = data_dict["pos_volume_center_of_mass"] if self.use_sdf_in_basis_func: encoding_node_vol = torch.cat( ( sdf_nodes, scaled_sdf_nodes, pos_volume_closest, pos_volume_center_of_mass, ), dim=-1, ) else: encoding_node_vol = pos_volume_center_of_mass # Calculate positional encoding on volume nodes encoding_node_vol = self.fc_p_vol(encoding_node_vol) if self.output_features_surf is not None: # Represent geometry on bounding box # Scaling factors if "surface_min_max" in data_dict.keys(): surf_max = data_dict["surface_min_max"][:, 1] surf_min = data_dict["surface_min_max"][:, 0] geo_centers_surf = ( 2.0 * (geo_centers - surf_min) / (surf_max - surf_min) - 1 ) else: geo_centers_surf = geo_centers encoding_g_surf = self.geo_rep_surface( geo_centers_surf, s_grid, sdf_surf_grid ) # Positional encoding based on center of mass of geometry to surface node pos_surface_center_of_mass = data_dict["pos_surface_center_of_mass"] encoding_node_surf = pos_surface_center_of_mass # Calculate positional encoding on surface centers encoding_node_surf = self.fc_p_surf(encoding_node_surf) if ( self.output_features_surf is not None and self.output_features_vol is not None and self.combined_vol_surf ): encoding_g = torch.cat((encoding_g_vol, encoding_g_surf), axis=1) encoding_g_surf = self.combined_unet_surf(encoding_g) encoding_g_vol = self.combined_unet_vol(encoding_g) if self.output_features_vol is not None: # Calculate local geometry encoding for volume # Sampled points on volume volume_mesh_centers = data_dict["volume_mesh_centers"] encoding_g_vol = self.volume_local_geo_encodings( 0.5 * encoding_g_vol, volume_mesh_centers, p_grid, ) # Approximate solution on volume node output_vol = self.solution_calculator_vol( volume_mesh_centers, encoding_g_vol, encoding_node_vol, global_params_values, global_params_reference, ) else: output_vol = None if self.output_features_surf is not None: # Sampled points on surface surface_mesh_centers = data_dict["surface_mesh_centers"] surface_normals = data_dict["surface_normals"] surface_areas = data_dict["surface_areas"] # Neighbors of sampled points on surface surface_mesh_neighbors = data_dict["surface_mesh_neighbors"] surface_neighbors_normals = data_dict["surface_neighbors_normals"] surface_neighbors_areas = data_dict["surface_neighbors_areas"] surface_areas = torch.unsqueeze(surface_areas, -1) surface_neighbors_areas = torch.unsqueeze(surface_neighbors_areas, -1) # Calculate local geometry encoding for surface encoding_g_surf = self.surface_local_geo_encodings( 0.5 * encoding_g_surf, surface_mesh_centers, s_grid ) # Approximate solution on surface cell center output_surf = self.solution_calculator_surf( surface_mesh_centers, encoding_g_surf, encoding_node_surf, surface_mesh_neighbors, surface_normals, surface_neighbors_normals, surface_areas, surface_neighbors_areas, global_params_values, global_params_reference, ) else: output_surf = None return output_vol, output_surf