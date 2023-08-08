# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Dict , List , Union , Optional , Tuple import torch import torch.nn as nn from torch import Tensor import torch.nn.functional as F import numpy as np import logging import modulus.sym.models.layers as layers from modulus.sym.models.layers import Activation from modulus.sym.models.layers.spectral_layers import ( calc_latent_derivatives , first_order_pino_grads , second_order_pino_grads , ) from modulus.sym.models.arch import Arch from modulus.sym.models.fully_connected import ConvFullyConnectedArch from modulus.sym.key import Key from modulus.sym.node import Node from modulus.sym.constants import JIT_PYTORCH_VERSION logger = logging . getLogger ( __name__ ) class FNO1DEncoder ( nn . Module ): def __init__ ( self , in_channels : int = 1 , nr_fno_layers : int = 4 , fno_layer_size : int = 32 , fno_modes : Union [ int , List [ int ]] = 16 , padding : Union [ int , List [ int ]] = 8 , padding_type : str = "constant" , activation_fn : Activation = Activation . GELU , coord_features : bool = True , ) -> None : super () . __init__ () self . in_channels = in_channels self . nr_fno_layers = nr_fno_layers self . fno_width = fno_layer_size self . coord_features = coord_features # Spectral modes to have weights if isinstance ( fno_modes , int ): fno_modes = [ fno_modes ] # Add relative coordinate feature if self . coord_features : self . in_channels = self . in_channels + 1 self . activation_fn = layers . get_activation_fn ( activation_fn ) self . spconv_layers = nn . ModuleList () self . conv_layers = nn . ModuleList () # Initial lift layer self . lift_layer = layers . Conv1dFCLayer ( self . in_channels , self . fno_width ) # Build Neural Fourier Operators for _ in range ( self . nr_fno_layers ): self . spconv_layers . append ( layers . SpectralConv1d ( self . fno_width , self . fno_width , fno_modes [ 0 ]) ) self . conv_layers . append ( nn . Conv1d ( self . fno_width , self . fno_width , 1 )) # Padding values for spectral conv if isinstance ( padding , int ): padding = [ padding ] self . pad = padding [: 1 ] self . ipad = [ - pad if pad > 0 else None for pad in self . pad ] self . padding_type = padding_type def forward ( self , x : Tensor ) -> Tensor : if self . coord_features : coord_feat = self . meshgrid ( list ( x . shape ), x . device ) x = torch . cat (( x , coord_feat ), dim = 1 ) x = self . lift_layer ( x ) # (left, right) x = F . pad ( x , ( 0 , self . pad [ 0 ]), mode = self . padding_type ) # Spectral layers for k , conv_w in enumerate ( zip ( self . conv_layers , self . spconv_layers )): conv , w = conv_w if k < len ( self . conv_layers ) - 1 : x = self . activation_fn ( conv ( x ) + w ( x ) ) # Spectral Conv + GELU causes JIT issue! else : x = conv ( x ) + w ( x ) x = x [ ... , : self . ipad [ 0 ]] return x def meshgrid ( self , shape : List [ int ], device : torch . device ): bsize , size_x = shape [ 0 ], shape [ 2 ] grid_x = torch . linspace ( 0 , 1 , size_x , dtype = torch . float32 , device = device ) grid_x = grid_x . unsqueeze ( 0 ) . unsqueeze ( 0 ) . repeat ( bsize , 1 , 1 ) return grid_x class FNO2DEncoder ( nn . Module ): def __init__ ( self , in_channels : int = 1 , nr_fno_layers : int = 4 , fno_layer_size : int = 32 , fno_modes : Union [ int , List [ int ]] = 16 , padding : Union [ int , List [ int ]] = 8 , padding_type : str = "constant" , activation_fn : Activation = Activation . GELU , coord_features : bool = True , ) -> None : super () . __init__ () self . in_channels = in_channels self . nr_fno_layers = nr_fno_layers self . fno_width = fno_layer_size self . coord_features = coord_features # Spectral modes to have weights if isinstance ( fno_modes , int ): fno_modes = [ fno_modes , fno_modes ] # Add relative coordinate feature if self . coord_features : self . in_channels = self . in_channels + 2 self . activation_fn = layers . get_activation_fn ( activation_fn ) self . spconv_layers = nn . ModuleList () self . conv_layers = nn . ModuleList () # Initial lift layer self . lift_layer = layers . Conv2dFCLayer ( self . in_channels , self . fno_width ) # Build Neural Fourier Operators for _ in range ( self . nr_fno_layers ): self . spconv_layers . append ( layers . SpectralConv2d ( self . fno_width , self . fno_width , fno_modes [ 0 ], fno_modes [ 1 ] ) ) self . conv_layers . append ( nn . Conv2d ( self . fno_width , self . fno_width , 1 )) # Padding values for spectral conv if isinstance ( padding , int ): padding = [ padding , padding ] padding = padding + [ 0 , 0 ] # Pad with zeros for smaller lists self . pad = padding [: 2 ] self . ipad = [ - pad if pad > 0 else None for pad in self . pad ] self . padding_type = padding_type def forward ( self , x : Tensor ) -> Tensor : assert ( x . dim () == 4 ), "Only 4D tensors [batch, in_channels, grid_x, grid_y] accepted for 2D FNO" if self . coord_features : coord_feat = self . meshgrid ( list ( x . shape ), x . device ) x = torch . cat (( x , coord_feat ), dim = 1 ) x = self . lift_layer ( x ) # (left, right, top, bottom) x = F . pad ( x , ( 0 , self . pad [ 0 ], 0 , self . pad [ 1 ]), mode = self . padding_type ) # Spectral layers for k , conv_w in enumerate ( zip ( self . conv_layers , self . spconv_layers )): conv , w = conv_w if k < len ( self . conv_layers ) - 1 : x = self . activation_fn ( conv ( x ) + w ( x ) ) # Spectral Conv + GELU causes JIT issue! else : x = conv ( x ) + w ( x ) # remove padding x = x [ ... , : self . ipad [ 1 ], : self . ipad [ 0 ]] return x def meshgrid ( self , shape : List [ int ], device : torch . device ): bsize , size_x , size_y = shape [ 0 ], shape [ 2 ], shape [ 3 ] grid_x = torch . linspace ( 0 , 1 , size_x , dtype = torch . float32 , device = device ) grid_y = torch . linspace ( 0 , 1 , size_y , dtype = torch . float32 , device = device ) grid_x , grid_y = torch . meshgrid ( grid_x , grid_y , indexing = "ij" ) grid_x = grid_x . unsqueeze ( 0 ) . unsqueeze ( 0 ) . repeat ( bsize , 1 , 1 , 1 ) grid_y = grid_y . unsqueeze ( 0 ) . unsqueeze ( 0 ) . repeat ( bsize , 1 , 1 , 1 ) return torch . cat (( grid_x , grid_y ), dim = 1 ) class FNO3DEncoder ( nn . Module ): def __init__ ( self , in_channels : int = 1 , nr_fno_layers : int = 4 , fno_layer_size : int = 32 , fno_modes : Union [ int , List [ int ]] = 16 , padding : Union [ int , List [ int ]] = 8 , padding_type : str = "constant" , activation_fn : Activation = Activation . GELU , coord_features : bool = True , ) -> None : super () . __init__ () self . in_channels = in_channels self . nr_fno_layers = nr_fno_layers self . fno_width = fno_layer_size self . coord_features = coord_features # Spectral modes to have weights if isinstance ( fno_modes , int ): fno_modes = [ fno_modes , fno_modes , fno_modes ] # Add relative coordinate feature if self . coord_features : self . in_channels = self . in_channels + 3 self . activation_fn = layers . get_activation_fn ( activation_fn ) self . spconv_layers = nn . ModuleList () self . conv_layers = nn . ModuleList () # Initial lift layer self . lift_layer = layers . Conv3dFCLayer ( self . in_channels , self . fno_width ) # Build Neural Fourier Operators for _ in range ( self . nr_fno_layers ): self . spconv_layers . append ( layers . SpectralConv3d ( self . fno_width , self . fno_width , fno_modes [ 0 ], fno_modes [ 1 ], fno_modes [ 2 ], ) ) self . conv_layers . append ( nn . Conv3d ( self . fno_width , self . fno_width , 1 )) # Padding values for spectral conv if isinstance ( padding , int ): padding = [ padding , padding , padding ] padding = padding + [ 0 , 0 , 0 ] # Pad with zeros for smaller lists self . pad = padding [: 3 ] self . ipad = [ - pad if pad > 0 else None for pad in self . pad ] self . padding_type = padding_type def forward ( self , x : Tensor ) -> Tensor : if self . coord_features : coord_feat = self . meshgrid ( list ( x . shape ), x . device ) x = torch . cat (( x , coord_feat ), dim = 1 ) x = self . lift_layer ( x ) # (left, right, top, bottom, front, back) x = F . pad ( x , ( 0 , self . pad [ 0 ], 0 , self . pad [ 1 ], 0 , self . pad [ 2 ]), mode = self . padding_type , ) # Spectral layers for k , conv_w in enumerate ( zip ( self . conv_layers , self . spconv_layers )): conv , w = conv_w if k < len ( self . conv_layers ) - 1 : x = self . activation_fn ( conv ( x ) + w ( x ) ) # Spectral Conv + GELU causes JIT issue! else : x = conv ( x ) + w ( x ) x = x [ ... , : self . ipad [ 2 ], : self . ipad [ 1 ], : self . ipad [ 0 ]] return x def meshgrid ( self , shape : List [ int ], device : torch . device ): bsize , size_x , size_y , size_z = shape [ 0 ], shape [ 2 ], shape [ 3 ], shape [ 4 ] grid_x = torch . linspace ( 0 , 1 , size_x , dtype = torch . float32 , device = device ) grid_y = torch . linspace ( 0 , 1 , size_y , dtype = torch . float32 , device = device ) grid_z = torch . linspace ( 0 , 1 , size_z , dtype = torch . float32 , device = device ) grid_x , grid_y , grid_z = torch . meshgrid ( grid_x , grid_y , grid_z , indexing = "ij" ) grid_x = grid_x . unsqueeze ( 0 ) . unsqueeze ( 0 ) . repeat ( bsize , 1 , 1 , 1 , 1 ) grid_y = grid_y . unsqueeze ( 0 ) . unsqueeze ( 0 ) . repeat ( bsize , 1 , 1 , 1 , 1 ) grid_z = grid_z . unsqueeze ( 0 ) . unsqueeze ( 0 ) . repeat ( bsize , 1 , 1 , 1 , 1 ) return torch . cat (( grid_x , grid_y , grid_z ), dim = 1 ) def grid_to_points1d ( vars_dict : Dict [ str , Tensor ]): for var , value in vars_dict . items (): value = torch . permute ( value , ( 0 , 2 , 1 )) vars_dict [ var ] = value . reshape ( - 1 , value . size ( - 1 )) return vars_dict def points_to_grid1d ( vars_dict : Dict [ str , Tensor ], shape : List [ int ]): for var , value in vars_dict . items (): value = value . reshape ( shape [ 0 ], shape [ 2 ], value . size ( - 1 )) vars_dict [ var ] = torch . permute ( value , ( 0 , 2 , 1 )) return vars_dict def grid_to_points2d ( vars_dict : Dict [ str , Tensor ]): for var , value in vars_dict . items (): value = torch . permute ( value , ( 0 , 2 , 3 , 1 )) vars_dict [ var ] = value . reshape ( - 1 , value . size ( - 1 )) return vars_dict def points_to_grid2d ( vars_dict : Dict [ str , Tensor ], shape : List [ int ]): for var , value in vars_dict . items (): value = value . reshape ( shape [ 0 ], shape [ 2 ], shape [ 3 ], value . size ( - 1 )) vars_dict [ var ] = torch . permute ( value , ( 0 , 3 , 1 , 2 )) return vars_dict def grid_to_points3d ( vars_dict : Dict [ str , Tensor ]): for var , value in vars_dict . items (): value = torch . permute ( value , ( 0 , 2 , 3 , 4 , 1 )) vars_dict [ var ] = value . reshape ( - 1 , value . size ( - 1 )) return vars_dict def points_to_grid3d ( vars_dict : Dict [ str , Tensor ], shape : List [ int ]): for var , value in vars_dict . items (): value = value . reshape ( shape [ 0 ], shape [ 2 ], shape [ 3 ], shape [ 4 ], value . size ( - 1 )) vars_dict [ var ] = torch . permute ( value , ( 0 , 4 , 1 , 2 , 3 )) return vars_dict [docs] class FNOArch ( Arch ): """Fourier neural operator (FNO) model. Note ---- The FNO architecture supports options for 1D, 2D and 3D fields which can be controlled using the `dimension` parameter. Parameters ---------- input_keys : List[Key] Input key list. The key dimension size should equal the variables channel dim. dimension : int Model dimensionality (supports 1, 2, 3). decoder_net : Arch Pointwise decoder network, input key should be the latent variable detach_keys : List[Key], optional List of keys to detach gradients, by default [] nr_fno_layers : int, optional Number of spectral convolution layers, by default 4 fno_modes : Union[int, List[int]], optional Number of Fourier modes with learnable weights, by default 16 padding : int, optional Padding size for FFT calculations, by default 8 padding_type : str, optional Padding type for FFT calculations ('constant', 'reflect', 'replicate' or 'circular'), by default "constant" activation_fn : Activation, optional Activation function, by default Activation.GELU coord_features : bool, optional Use coordinate meshgrid as additional input feature, by default True Variable Shape -------------- Input variable tensor shape: - 1D: :math:`[N, size, W]` - 2D: :math:`[N, size, H, W]` - 3D: :math:`[N, size, D, H, W]` Output variable tensor shape: - 1D: :math:`[N, size, W]` - 2D: :math:`[N, size, H, W]` - 3D: :math:`[N, size, D, H, W]` Example ------- 1D FNO model >>> decoder = FullyConnectedArch([Key("z", size=32)], [Key("y", size=2)]) >>> fno_1d = FNOArch([Key("x", size=2)], dimension=1, decoder_net=decoder) >>> model = fno_1d.make_node() >>> input = {"x": torch.randn(20, 2, 64)} >>> output = model.evaluate(input) 2D FNO model >>> decoder = ConvFullyConnectedArch([Key("z", size=32)], [Key("y", size=2)]) >>> fno_2d = FNOArch([Key("x", size=2)], dimension=2, decoder_net=decoder) >>> model = fno_2d.make_node() >>> input = {"x": torch.randn(20, 2, 64, 64)} >>> output = model.evaluate(input) 3D FNO model >>> decoder = Siren([Key("z", size=32)], [Key("y", size=2)]) >>> fno_3d = FNOArch([Key("x", size=2)], dimension=3, decoder_net=decoder) >>> model = fno_3d.make_node() >>> input = {"x": torch.randn(20, 2, 64, 64, 64)} >>> output = model.evaluate(input) """ def __init__ ( self , input_keys : List [ Key ], dimension : int , decoder_net : Arch , detach_keys : List [ Key ] = [], nr_fno_layers : int = 4 , fno_modes : Union [ int , List [ int ]] = 16 , padding : int = 8 , padding_type : str = "constant" , activation_fn : Activation = Activation . GELU , coord_features : bool = True , ) -> None : super () . __init__ ( input_keys = input_keys , output_keys = [], detach_keys = detach_keys ) self . dimension = dimension self . nr_fno_layers = nr_fno_layers self . fno_modes = fno_modes self . padding = padding self . padding_type = padding_type self . activation_fn = activation_fn self . coord_features = coord_features # decoder net self . decoder_net = decoder_net self . calc_pino_gradients = False self . output_keys = self . decoder_net . output_keys self . output_key_dict = { str ( var ): var . size for var in self . output_keys } self . output_scales = { str ( k ): k . scale for k in self . output_keys } self . latent_key = self . decoder_net . input_keys self . latent_key_dict = { str ( var ): var . size for var in self . latent_key } assert ( len ( self . latent_key ) == 1 ), "FNO decoder network should only have a single input key" self . latent_key = str ( self . latent_key [ 0 ]) in_channels = sum ( self . input_key_dict . values ()) self . fno_layer_size = sum ( self . latent_key_dict . values ()) if self . dimension == 1 : FNOModel = FNO1DEncoder self . grid_to_points = grid_to_points1d # For JIT self . points_to_grid = points_to_grid1d # For JIT elif self . dimension == 2 : FNOModel = FNO2DEncoder self . grid_to_points = grid_to_points2d # For JIT self . points_to_grid = points_to_grid2d # For JIT elif self . dimension == 3 : FNOModel = FNO3DEncoder self . grid_to_points = grid_to_points3d # For JIT self . points_to_grid = points_to_grid3d # For JIT else : raise NotImplementedError ( "Invalid dimensionality. Only 1D, 2D and 3D FNO implemented" ) self . spec_encoder = FNOModel ( in_channels , nr_fno_layers = self . nr_fno_layers , fno_layer_size = self . fno_layer_size , fno_modes = self . fno_modes , padding = self . padding , padding_type = self . padding_type , activation_fn = self . activation_fn , coord_features = self . coord_features , ) [docs] def add_pino_gradients ( self , derivatives : List [ Key ], domain_length : List [ float ] = [ 1.0 , 1.0 ] ) -> None : """Adds PINO "exact" gradient calculations model outputs. Note ---- This will constraint the FNO decoder to a two layer fully-connected model with Tanh activactions functions. This is done for computational efficiency since gradients calculations are explicit. Auto-diff is far too slow for this method. Parameters ---------- derivatives : List[Key] List of derivative keys domain_length : List[float], optional Domain size of input grid. Needed for calculating the gradients of the latent variables. By default [1.0, 1.0] Raises ------ ValueError If domain length list is not the same size as the FNO model dimenion Note ---- For details on the "exact" gradient calculation refer to section 3.3 in: https://arxiv.org/pdf/2111.03794.pdf """ assert ( len ( domain_length ) == self . dimension ), "Domain length must be same length as the dimension of the model" self . domain_length = domain_length logger . warning ( "Switching decoder to two layer FC model with Tanh activations for PINO" ) self . decoder_net = ConvFullyConnectedArch ( input_keys = self . decoder_net . input_keys , output_keys = self . decoder_net . output_keys , layer_size = self . fno_layer_size , nr_layers = 1 , activation_fn = Activation . TANH , skip_connections = False , adaptive_activations = False , ) self . calc_pino_gradients = True self . first_order_pino = False self . second_order_pino = False self . derivative_keys = [] for var in derivatives : dx_name = str ( var ) . split ( "__" ) # Split name to get original var names if len ( dx_name ) == 2 : # First order assert ( dx_name [ 1 ] in [ "x" , "y" , "z" ][: self . dimension ] ), f "Invalid first-order derivative { str ( var ) } for { self . dimension } d FNO" self . derivative_keys . append ( var ) self . first_order_pino = True elif len ( dx_name ) == 3 : assert ( dx_name [ 1 ] in [ "x" , "y" , "z" ][: self . dimension ] and dx_name [ 1 ] == dx_name [ 2 ] ), f "Invalid second-order derivative { str ( var ) } for { self . dimension } d FNO" self . derivative_keys . append ( var ) self . second_order_pino = True elif len ( dx_name ) > 3 : raise ValueError ( "FNO only supports first order and laplacian second order derivatives" ) # Add derivative keys into output keys self . output_keys_fno = self . output_keys . copy () self . output_key_fno_dict = { str ( var ): var . size for var in self . output_keys_fno } self . output_keys = self . output_keys + self . derivative_keys self . output_key_dict = { str ( var ): var . size for var in self . output_keys } [docs] def forward ( self , in_vars : Dict [ str , Tensor ]) -> Dict [ str , Tensor ]: x = self . prepare_input ( in_vars , self . input_key_dict . keys (), detach_dict = self . detach_key_dict , dim = 1 , input_scales = self . input_scales , ) y_latent = self . spec_encoder ( x ) y_shape = list ( y_latent . size ()) y_input = { self . latent_key : y_latent } # Reshape to pointwise inputs if not a conv FC model if self . decoder_net . var_dim == - 1 : y_input = self . grid_to_points ( y_input ) y = self . decoder_net ( y_input ) # Convert back into grid if self . decoder_net . var_dim == - 1 : y = self . points_to_grid ( y , y_shape ) if self . calc_pino_gradients : output_grads = self . calc_pino_derivatives ( y_latent ) y . update ( output_grads ) return y @torch.jit.ignore def calc_pino_derivatives(self, latent: Tensor) -> Dict[str, Tensor]: # Calculate the gradients of latent variables # This is done using FFT and is the reason we need a domain size lat_dx, lat_ddx = calc_latent_derivatives(latent, self.domain_length) # Get weight matrices from decoder weights, biases = self.decoder_net._impl.get_weight_list() outputs = {} # calc first order derivatives if self.first_order_pino: output_dx = first_order_pino_grads( u=latent, ux=lat_dx, weights_1=weights[0], weights_2=weights[1], bias_1=biases[0], ) # Build output dictionary manually (would normally use prepare_output) dims = ["x", "y", "z"] for d in range(len(output_dx)): # Loop through dimensions for k, v in zip( self.output_keys_fno, torch.split( output_dx[d], list(self.output_key_fno_dict.values()), dim=1 ), ): # Loop through variables if f"{k}__{dims[d]}__{dims[d]}" in self.output_key_dict.keys(): out_scale = self.decoder_net.output_scales[str(k)][ 1 ] # Apply out scaling to grads outputs[f"{k}__{dims[d]}"] = v * out_scale # calc first order derivatives if self.second_order_pino: output_dxx = second_order_pino_grads( u=latent, ux=lat_dx, uxx=lat_ddx, weights_1=weights[0], weights_2=weights[1], bias_1=biases[0], ) # Build output dictionary manually (would normally use prepare_output) dims = ["x", "y", "z"] for d in range(len(output_dxx)): # Loop through dimensions for k, v in zip( self.output_keys_fno, torch.split( output_dxx[d], list(self.output_key_fno_dict.values()), dim=1 ), ): # Loop through variables if f"{k}__{dims[d]}__{dims[d]}" in self.output_key_dict.keys(): out_scale = self.decoder_net.output_scales[str(k)][ 1 ] # Apply out scaling to grads outputs[f"{k}__{dims[d]}__{dims[d]}"] = v * out_scale return outputs