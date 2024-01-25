# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Continuous type constraints """ import torch from torch.nn.parallel import DistributedDataParallel import numpy as np from typing import Dict , List , Union , Tuple , Callable import sympy as sp import logging import torch from .constraint import Constraint from .utils import _compute_outvar , _compute_lambda_weighting from modulus.sym.utils.io.vtk import var_to_polyvtk from modulus.sym.graph import Graph from modulus.sym.key import Key from modulus.sym.node import Node from modulus.sym.loss import Loss , PointwiseLossNorm , IntegralLossNorm from modulus.sym.distributed import DistributedManager from modulus.sym.utils.sympy import np_lambdify from modulus.sym.geometry import Geometry from modulus.sym.geometry.helper import _sympy_criteria_to_criteria from modulus.sym.geometry.parameterization import Parameterization , Bounds from modulus.sym.dataset import ( DictPointwiseDataset , ListIntegralDataset , ContinuousPointwiseIterableDataset , ContinuousIntegralIterableDataset , DictImportanceSampledPointwiseIterableDataset , DictVariationalDataset , ) Tensor = torch . Tensor logger = logging . getLogger ( __name__ ) [docs] class PointwiseConstraint ( Constraint ): """ Base class for all Pointwise Constraints """ def save_batch ( self , filename ): # sample batch invar , true_outvar , lambda_weighting = next ( self . dataloader ) invar = Constraint . _set_device ( invar , device = self . device , requires_grad = True ) true_outvar = Constraint . _set_device ( true_outvar , device = self . device ) lambda_weighting = Constraint . _set_device ( lambda_weighting , device = self . device ) # If using DDP, strip out collective stuff to prevent deadlocks # This only works either when one process alone calls in to save_batch # or when multiple processes independently save data if hasattr ( self . model , "module" ): modl = self . model . module else : modl = self . model # compute pred outvar pred_outvar = modl ( invar ) # rename values and save batch to vtk file TODO clean this up after graph unroll stuff named_lambda_weighting = { "lambda_" + key : value for key , value in lambda_weighting . items () } named_true_outvar = { "true_" + key : value for key , value in true_outvar . items ()} named_pred_outvar = { "pred_" + key : value for key , value in pred_outvar . items ()} save_var = { ** { key : value for key , value in invar . items ()}, ** named_true_outvar , ** named_pred_outvar , ** named_lambda_weighting , } save_var = { key : value . cpu () . detach () . numpy () for key , value in save_var . items () } var_to_polyvtk ( save_var , filename ) def load_data ( self ): # get train points from dataloader invar , true_outvar , lambda_weighting = next ( self . dataloader ) self . _input_vars = Constraint . _set_device ( invar , device = self . device , requires_grad = True ) self . _target_vars = Constraint . _set_device ( true_outvar , device = self . device ) self . _lambda_weighting = Constraint . _set_device ( lambda_weighting , device = self . device ) def load_data_static ( self ): if self . _input_vars is None : # Default loading if vars not allocated self . load_data () else : # get train points from dataloader invar , true_outvar , lambda_weighting = next ( self . dataloader ) # Set grads to false here for inputs, static var has allocation already input_vars = Constraint . _set_device ( invar , device = self . device , requires_grad = False ) target_vars = Constraint . _set_device ( true_outvar , device = self . device ) lambda_weighting = Constraint . _set_device ( lambda_weighting , device = self . device ) for key in input_vars . keys (): self . _input_vars [ key ] . data . copy_ ( input_vars [ key ]) for key in target_vars . keys (): self . _target_vars [ key ] . copy_ ( target_vars [ key ]) for key in lambda_weighting . keys (): self . _lambda_weighting [ key ] . copy_ ( lambda_weighting [ key ]) def forward ( self ): # compute pred outvar self . _output_vars = self . model ( self . _input_vars ) def loss ( self , step : int ) -> Dict [ str , torch . Tensor ]: if self . _output_vars is None : logger . warn ( "Calling loss without forward call" ) return {} losses = self . _loss ( self . _input_vars , self . _output_vars , self . _target_vars , self . _lambda_weighting , step , ) return losses [docs] @classmethod def from_numpy ( cls , nodes : List [ Node ], invar : Dict [ str , np . ndarray ], outvar : Dict [ str , np . ndarray ], batch_size : int , lambda_weighting : Dict [ str , np . ndarray ] = None , loss : Loss = PointwiseLossNorm (), shuffle : bool = True , drop_last : bool = True , num_workers : int = 0 , ): """ Create custom pointwise constraint from numpy arrays. Parameters ---------- nodes : List[Node] List of Modulus Nodes to unroll graph with. invar : Dict[str, np.ndarray (N, 1)] Dictionary of numpy arrays as input. outvar : Dict[str, np.ndarray (N, 1)] Dictionary of numpy arrays to enforce constraint on. batch_size : int Batch size used in training. lambda_weighting : Dict[str, np.ndarray (N, 1)] Dictionary of numpy arrays to pointwise weight losses. Default is ones. loss : Loss Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...). shuffle : bool, optional Randomly shuffle examples in dataset every epoch, by default True drop_last : bool, optional Drop last mini-batch if dataset not fully divisible but batch_size, by default False num_workers : int Number of worker used in fetching data. """ if "area" not in invar : invar [ "area" ] = np . ones_like ( next ( iter ( invar . values ()))) # TODO: better area definition? # no need to lambdify: outvar / lambda_weighting already contain np arrays # make point dataset dataset = DictPointwiseDataset ( invar = invar , outvar = outvar , lambda_weighting = lambda_weighting , ) return cls ( nodes = nodes , dataset = dataset , loss = loss , batch_size = batch_size , shuffle = shuffle , drop_last = drop_last , num_workers = num_workers , ) [docs] class PointwiseBoundaryConstraint ( PointwiseConstraint ): """ Pointwise Constraint applied to boundary/perimeter/surface of geometry. For example, in 3D this will create a constraint on the surface of the given geometry. Parameters ---------- nodes : List[Node] List of Modulus Nodes to unroll graph with. geometry : Geometry Modulus `Geometry` to apply the constraint with. outvar : Dict[str, Union[int, float, sp.Basic]] A dictionary of SymPy Symbols/Expr, floats or int. This is used to describe the constraint. For example, `outvar={'u': 0}` would specify `'u'` to be zero everywhere on the constraint. batch_size : int Batch size used in training. criteria : Union[sp.Basic, True] SymPy criteria function specifies to only apply constraint to areas that satisfy this criteria. For example, if `criteria=sympy.Symbol('x')>0` then only areas that have positive `'x'` values will have the constraint applied to them. lambda_weighting : Dict[str, Union[int, float, sp.Basic]] = None The spatial pointwise weighting of the constraint. For example, `lambda_weighting={'lambda_u': 2.0*sympy.Symbol('x')}` would apply a pointwise weighting to the loss of `2.0 * x`. parameterization : Union[Parameterization, None], optional This allows adding parameterization or additional inputs. fixed_dataset : bool = True If True then the points sampled for this constraint are done right when initialized and fixed. If false then the points are continually resampled. compute_sdf_derivatives: bool, optional Compute SDF derivatives when sampling geometery importance_measure : Union[Callable, None] = None A callable function that computes a scalar importance measure. This importance measure is then used in the constraint when sampling points. Areas with higher importance are sampled more frequently according to Monte Carlo importance sampling, https://en.wikipedia.org/wiki/Monte_Carlo_integration. batch_per_epoch : int = 1000 If `fixed_dataset=True` then the total number of points generated to apply constraint on is `total_nr_points=batch_per_epoch*batch_size`. quasirandom : bool = False If true then sample the points using the Halton sequence. num_workers : int Number of worker used in fetching data. loss : Loss Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...). shuffle : bool, optional Randomly shuffle examples in dataset every epoch, by default True """ def __init__ ( self , nodes : List [ Node ], geometry : Geometry , outvar : Dict [ str , Union [ int , float , sp . Basic ]], batch_size : int , criteria : Union [ sp . Basic , Callable , None ] = None , lambda_weighting : Dict [ str , Union [ int , float , sp . Basic ]] = None , parameterization : Union [ Parameterization , None ] = None , fixed_dataset : bool = True , importance_measure : Union [ Callable , None ] = None , batch_per_epoch : int = 1000 , quasirandom : bool = False , num_workers : int = 0 , loss : Loss = PointwiseLossNorm (), shuffle : bool = True , ): # assert that not using importance measure with continuous dataset assert not ( ( not fixed_dataset ) and ( importance_measure is not None ) ), "Using Importance measure with continuous dataset is not supported" # if fixed dataset then sample points and fix for all of training if fixed_dataset : # sample boundary invar = geometry . sample_boundary ( batch_size * batch_per_epoch , criteria = criteria , parameterization = parameterization , quasirandom = quasirandom , ) # compute outvar outvar = _compute_outvar ( invar , outvar ) # set lambda weighting lambda_weighting = _compute_lambda_weighting ( invar , outvar , lambda_weighting ) # make point dataset if importance_measure is None : invar [ "area" ] *= batch_per_epoch # TODO find better way to do this dataset = DictPointwiseDataset ( invar = invar , outvar = outvar , lambda_weighting = lambda_weighting , ) else : dataset = DictImportanceSampledPointwiseIterableDataset ( invar = invar , outvar = outvar , batch_size = batch_size , importance_measure = importance_measure , lambda_weighting = lambda_weighting , shuffle = shuffle , ) # else sample points every batch else : # invar function invar_fn = lambda : geometry . sample_boundary ( batch_size , criteria = criteria , parameterization = parameterization , quasirandom = quasirandom , ) # outvar function outvar_fn = lambda invar : _compute_outvar ( invar , outvar ) # lambda weighting function lambda_weighting_fn = lambda invar , outvar : _compute_lambda_weighting ( invar , outvar , lambda_weighting ) # make point dataloader dataset = ContinuousPointwiseIterableDataset ( invar_fn = invar_fn , outvar_fn = outvar_fn , lambda_weighting_fn = lambda_weighting_fn , ) # initialize constraint super () . __init__ ( nodes = nodes , dataset = dataset , loss = loss , batch_size = batch_size , shuffle = shuffle , drop_last = True , num_workers = num_workers , ) [docs] class PointwiseInteriorConstraint ( PointwiseConstraint ): """ Pointwise Constraint applied to interior of geometry. For example, in 3D this will create a constraint on the interior volume of the given geometry. Parameters ---------- nodes : List[Node] List of Modulus Nodes to unroll graph with. geometry : Geometry Modulus `Geometry` to apply the constraint with. outvar : Dict[str, Union[int, float, sp.Basic]] A dictionary of SymPy Symbols/Expr, floats or int. This is used to describe the constraint. For example, `outvar={'u': 0}` would specify `'u'` to be zero everywhere in the constraint. batch_size : int Batch size used in training. bounds : Dict[sp.Basic, Tuple[float, float]] = None Bounds of the given geometry, (e.g. `bounds={sympy.Symbol('x'): (0, 1), sympy.Symbol('y'): (0, 1)}). criteria : Union[sp.basic, True] SymPy criteria function specifies to only apply constraint to areas that satisfy this criteria. For example, if `criteria=sympy.Symbol('x')>0` then only areas that have positive `'x'` values will have the constraint applied to them. lambda_weighting : Dict[str, Union[int, float, sp.Basic]] = None The spatial pointwise weighting of the constraint. For example, `lambda_weighting={'lambda_u': 2.0*sympy.Symbol('x')}` would apply a pointwise weighting to the loss of `2.0 * x`. parameterization: Union[Parameterization, None] = {} This allows adding parameterization or additional inputs. fixed_dataset : bool = True If True then the points sampled for this constraint are done right when initialized and fixed. If false then the points are continually resampled. importance_measure : Union[Callable, None] = None A callable function that computes a scalar importance measure. This importance measure is then used in the constraint when sampling points. Areas with higher importance are sampled more frequently according to Monte Carlo importance sampling, https://en.wikipedia.org/wiki/Monte_Carlo_integration. batch_per_epoch : int = 1000 If `fixed_dataset=True` then the total number of points generated to apply constraint on is `total_nr_points=batch_per_epoch*batch_size`. quasirandom : bool = False If true then sample the points using the Halton sequence. num_workers : int Number of worker used in fetching data. loss : Loss Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...). shuffle : bool, optional Randomly shuffle examples in dataset every epoch, by default True """ def __init__ ( self , nodes : List [ Node ], geometry : Geometry , outvar : Dict [ str , Union [ int , float , sp . Basic ]], batch_size : int , bounds : Dict [ sp . Basic , Tuple [ float , float ]] = None , criteria : Union [ sp . Basic , Callable , None ] = None , lambda_weighting : Dict [ str , Union [ int , float , sp . Basic ]] = None , parameterization : Union [ Parameterization , None ] = None , fixed_dataset : bool = True , compute_sdf_derivatives : bool = False , importance_measure : Union [ Callable , None ] = None , batch_per_epoch : int = 1000 , quasirandom : bool = False , num_workers : int = 0 , loss : Loss = PointwiseLossNorm (), shuffle : bool = True , ): # assert that not using importance measure with continuous dataset assert not ( ( not fixed_dataset ) and ( importance_measure is not None ) ), "Using Importance measure with continuous dataset is not supported" # if fixed dataset then sample points and fix for all of training if fixed_dataset : # sample interior invar = geometry . sample_interior ( batch_size * batch_per_epoch , bounds = bounds , criteria = criteria , parameterization = parameterization , quasirandom = quasirandom , compute_sdf_derivatives = compute_sdf_derivatives , ) # compute outvar outvar = _compute_outvar ( invar , outvar ) # set lambda weighting lambda_weighting = _compute_lambda_weighting ( invar , outvar , lambda_weighting ) # make point dataset if importance_measure is None : invar [ "area" ] *= batch_per_epoch # TODO find better way to do this dataset = DictPointwiseDataset ( invar = invar , outvar = outvar , lambda_weighting = lambda_weighting , ) else : dataset = DictImportanceSampledPointwiseIterableDataset ( invar = invar , outvar = outvar , batch_size = batch_size , importance_measure = importance_measure , lambda_weighting = lambda_weighting , shuffle = shuffle , ) # else sample points every batch else : # invar function invar_fn = lambda : geometry . sample_interior ( batch_size , bounds = bounds , criteria = criteria , parameterization = parameterization , quasirandom = quasirandom , compute_sdf_derivatives = compute_sdf_derivatives , ) # outvar function outvar_fn = lambda invar : _compute_outvar ( invar , outvar ) # lambda weighting function lambda_weighting_fn = lambda invar , outvar : _compute_lambda_weighting ( invar , outvar , lambda_weighting ) # make point dataloader dataset = ContinuousPointwiseIterableDataset ( invar_fn = invar_fn , outvar_fn = outvar_fn , lambda_weighting_fn = lambda_weighting_fn , ) # initialize constraint super () . __init__ ( nodes = nodes , dataset = dataset , loss = loss , batch_size = batch_size , shuffle = shuffle , drop_last = True , num_workers = num_workers , ) [docs] class IntegralConstraint ( Constraint ): """ Base class for all Integral Constraints """ def save_batch ( self , filename ): pass # sample batch invar , true_outvar , lambda_weighting = next ( self . dataloader ) invar = Constraint . _set_device ( invar , device = self . device , requires_grad = True ) # rename values and save batch to vtk file TODO clean this up after graph unroll stuff for i in range ( self . batch_size ): save_var = { key : value [ i ] . cpu () . detach () . numpy () for key , value in invar . items () } var_to_polyvtk ( save_var , filename + "_batch_" + str ( i )) def load_data ( self ): # get train points from dataloader invar , true_outvar , lambda_weighting = next ( self . dataloader ) self . _input_vars = Constraint . _set_device ( invar , device = self . device , requires_grad = True ) self . _target_vars = Constraint . _set_device ( true_outvar , device = self . device ) self . _lambda_weighting = Constraint . _set_device ( lambda_weighting , device = self . device ) def load_data_static ( self ): if self . _input_vars is None : # Default loading if vars not allocated self . load_data () else : # get train points from dataloader invar , true_outvar , lambda_weighting = next ( self . dataloader ) # Set grads to false here for inputs, static var has allocation already input_vars = Constraint . _set_device ( invar , device = self . device , requires_grad = False ) target_vars = Constraint . _set_device ( true_outvar , device = self . device ) lambda_weighting = Constraint . _set_device ( lambda_weighting , device = self . device ) for key in input_vars . keys (): self . _input_vars [ key ] . data . copy_ ( input_vars [ key ]) for key in target_vars . keys (): self . _target_vars [ key ] . copy_ ( target_vars [ key ]) for key in lambda_weighting . keys (): self . _lambda_weighting [ key ] . copy_ ( lambda_weighting [ key ]) @property def output_vars ( self ) -> Dict [ str , Tensor ]: return self . _output_vars @output_vars . setter def output_vars ( self , data : Dict [ str , Tensor ]): self . _output_vars = {} for output in self . output_names : self . _output_vars [ str ( output )] = data [ str ( output )] def forward ( self ): # compute pred outvar self . _output_vars = self . model ( self . _input_vars ) def loss ( self , step : int ) -> Dict [ str , torch . Tensor ]: if self . _output_vars is None : logger . warn ( "Calling loss without forward call" ) return {} # split for individual integration list_invar , list_pred_outvar , list_true_outvar , list_lambda_weighting = ( [], [], [], [], ) for i in range ( self . batch_size ): list_invar . append ( { key : value [ i ] for key , value in self . _input_vars . items ()} ) list_pred_outvar . append ( { key : value [ i ] for key , value in self . _output_vars . items ()} ) list_true_outvar . append ( { key : value [ i ] for key , value in self . _target_vars . items ()} ) list_lambda_weighting . append ( { key : value [ i ] for key , value in self . _lambda_weighting . items ()} ) # compute integral losses losses = self . _loss ( list_invar , list_pred_outvar , list_true_outvar , list_lambda_weighting , step ) return losses [docs] class IntegralBoundaryConstraint ( IntegralConstraint ): """ Integral Constraint applied to boundary/perimeter/surface of geometry. For example, in 3D this will create a constraint on the surface of the given geometry. Parameters ---------- nodes : List[Node] List of Modulus Nodes to unroll graph with. geometry : Geometry Modulus `Geometry` to apply the constraint with. outvar : Dict[str, Union[int, float, sp.Basic]] A dictionary of SymPy Symbols/Expr, floats or int. This is used to describe the constraint. For example, `outvar={'u': 0}` would specify the integral of `'u'` to be zero. batch_size : int Number of integrals to apply. integral_batch_size : int Batch sized used in the Monte Carlo integration to compute the integral. criteria : Union[sp.basic, True] SymPy criteria function specifies to only integrate areas that satisfy this criteria. For example, if `criteria=sympy.Symbol('x')>0` then only areas that have positive `'x'` values will be integrated. lambda_weighting : Dict[str, Union[int, float, sp.Basic]] = None The weighting of the constraint. For example, `lambda_weighting={'lambda_u': 2.0}` would weight the integral constraint by `2.0`. parameterization : Union[Parameterization, None] This allows adding parameterization or additional inputs. fixed_dataset : bool = True If True then the points sampled for this constraint are done right when initialized and fixed. If false then the points are continually resampled. batch_per_epoch : int = 100 If `fixed_dataset=True` then the total number of integrals generated to apply constraint on is `total_nr_integrals=batch_per_epoch*batch_size`. quasirandom : bool = False If true then sample the points using the Halton sequence. num_workers : int Number of worker used in fetching data. loss : Loss Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...). shuffle : bool, optional Randomly shuffle examples in dataset every epoch, by default True """ def __init__ ( self , nodes : List [ Node ], geometry : Geometry , outvar : Dict [ str , Union [ int , float , sp . Basic ]], batch_size : int , integral_batch_size : int , criteria : Union [ sp . Basic , Callable , None ] = None , lambda_weighting : Dict [ str , Union [ int , float , sp . Basic ]] = None , parameterization : Union [ Parameterization , None ] = None , fixed_dataset : bool = True , batch_per_epoch : int = 100 , quasirandom : bool = False , num_workers : int = 0 , loss : Loss = IntegralLossNorm (), shuffle : bool = True , ): # convert dict to parameterization if needed if parameterization is None : parameterization = geometry . parameterization elif isinstance ( parameterization , dict ): parameterization = Parameterization ( parameterization ) # Fixed number of integral examples if fixed_dataset : # sample geometry to generate integral batchs list_invar = [] list_outvar = [] list_lambda_weighting = [] for i in range ( batch_size * batch_per_epoch ): # sample parameter ranges if parameterization : specific_param_ranges = parameterization . sample ( 1 ) else : specific_param_ranges = {} # sample boundary invar = geometry . sample_boundary ( integral_batch_size , criteria = criteria , parameterization = Parameterization ( { sp . Symbol ( key ): float ( value ) for key , value in specific_param_ranges . items () } ), quasirandom = quasirandom , ) # compute outvar if ( not specific_param_ranges ): # TODO this can be removed after a np_lambdify rewrite specific_param_ranges = { "_" : next ( iter ( invar . values ()))[ 0 : 1 ]} outvar_star = _compute_outvar ( specific_param_ranges , outvar ) # set lambda weighting lambda_weighting_star = _compute_lambda_weighting ( specific_param_ranges , outvar , lambda_weighting ) # store samples list_invar . append ( invar ) list_outvar . append ( outvar_star ) list_lambda_weighting . append ( lambda_weighting_star ) # make dataset of integral planes dataset = ListIntegralDataset ( list_invar = list_invar , list_outvar = list_outvar , list_lambda_weighting = list_lambda_weighting , ) # Continuous sampling else : # sample parameter ranges if parameterization : param_ranges_fn = lambda : parameterization . sample ( 1 ) else : param_ranges_fn = lambda : {} # invar function invar_fn = lambda param_range : geometry . sample_boundary ( integral_batch_size , criteria = criteria , parameterization = Parameterization ( { sp . Symbol ( key ): float ( value ) for key , value in param_range . items ()} ), quasirandom = quasirandom , ) # outvar function outvar_fn = lambda param_range : _compute_outvar ( param_range , outvar ) # lambda weighting function lambda_weighting_fn = lambda param_range , outvar : _compute_lambda_weighting ( param_range , outvar , lambda_weighting ) # make dataset of integral planes dataset = ContinuousIntegralIterableDataset ( invar_fn = invar_fn , outvar_fn = outvar_fn , batch_size = batch_size , lambda_weighting_fn = lambda_weighting_fn , param_ranges_fn = param_ranges_fn , ) self . batch_size = batch_size # initialize constraint super () . __init__ ( nodes = nodes , dataset = dataset , loss = loss , batch_size = batch_size , shuffle = shuffle , drop_last = True , num_workers = num_workers , ) [docs] class VariationalConstraint ( Constraint ): """ Base class for all Variational Constraints. B(u, v, g, dom) = \\int_{dom} (F(u, v) - g*v) dx = 0, where F is an operator, g is a given function/data, v is the test function. loss of variational = B1(u1, v1, g1, dom1) + B2(u2, v2, g2, dom2) + ... """ def __init__ ( self , nodes : List [ Node ], datasets : Dict [ str , DictVariationalDataset ], batch_sizes : Dict [ str , int ], loss : Loss = PointwiseLossNorm (), shuffle : bool = True , drop_last : bool = True , num_workers : int = 0 , ): # Get DDP manager self . manager = DistributedManager () self . device = self . manager . device if not drop_last and self . manager . cuda_graphs : logger . info ( "drop_last must be true when using cuda graphs" ) drop_last = True # make dataloader from dataset self . data_loaders = {} invar_keys = [] outvar_keys = [] for name in datasets : self . data_loaders [ name ] = iter ( Constraint . get_dataloader ( dataset = datasets [ name ], batch_size = batch_sizes [ name ], shuffle = shuffle , drop_last = drop_last , num_workers = num_workers , ) ) invar_keys = invar_keys + datasets [ name ] . invar_keys outvar_keys = outvar_keys + datasets [ name ] . outvar_keys # construct model from nodes self . model = Graph ( nodes , Key . convert_list ( list ( set ( invar_keys ))), Key . convert_list ( list ( set ( outvar_keys ))), ) self . manager = DistributedManager () self . device = self . manager . device self . model . to ( self . device ) if self . manager . distributed : # https://pytorch.org/docs/master/notes/cuda.html#id5 s = torch . cuda . Stream () s . wait_stream ( torch . cuda . current_stream ()) with torch . cuda . stream ( s ): self . model = DistributedDataParallel ( self . model , device_ids = [ self . manager . local_rank ], output_device = self . device , broadcast_buffers = self . manager . broadcast_buffers , find_unused_parameters = self . manager . find_unused_parameters , process_group = self . manager . group ( "data_parallel" ), # None by default ) torch . cuda . current_stream () . wait_stream ( s ) self . _input_names = Key . convert_list ( list ( set ( invar_keys ))) self . _output_names = Key . convert_list ( list ( set ( outvar_keys ))) self . _input_vars = None self . _target_vars = None self . _lambda_weighting = None # put loss on device self . _loss = loss . to ( self . device ) def save_batch ( self , filename ): # sample batch for name , data_loader in self . data_loaders . items (): invar = Constraint . _set_device ( next ( data_loader ), device = self . device , requires_grad = True ) # If using DDP, strip out collective stuff to prevent deadlocks # This only works either when one process alone calls in to save_batch # or when multiple processes independently save data if hasattr ( self . model , "module" ): modl = self . model . module else : modl = self . model # compute pred outvar outvar = modl ( invar ) named_outvar = { "pred_" + key : value . cpu () . detach () . numpy () for key , value in outvar . items () } save_var = { ** { key : value . cpu () . detach () . numpy () for key , value in invar . items ()}, ** named_outvar , } var_to_polyvtk ( save_var , filename + "_" + name ) def load_data ( self ): self . _input_vars = {} self . _output_vars = {} for name , data_loader in self . data_loaders . items (): # get train points from dataloader invar = next ( data_loader ) self . _input_vars [ name ] = Constraint . _set_device ( invar , device = self . device , requires_grad = True ) def load_data_static ( self ): if self . _input_vars is None : # Default loading if vars not allocated self . load_data () else : for name , data_loader in self . data_loaders . items (): # get train points from dataloader invar = next ( data_loader ) # Set grads to false here for inputs, static var has allocation already input_vars = Constraint . _set_device ( invar , device = self . device , requires_grad = False ) for key in input_vars . keys (): self . _input_vars [ name ][ key ] . data . copy_ ( input_vars [ key ]) self . _input_vars [ name ] = Constraint . _set_device ( invar , device = self . device , requires_grad = True ) def forward ( self ): # compute pred outvar for name in self . _input_vars . keys (): self . _output_vars [ name ] = self . model ( self . _input_vars [ name ]) def loss ( self , step ): # compute loss losses = self . _loss ( list ( self . _input_vars . values ()), list ( self . _output_vars . values ()), step ) return losses [docs] class VariationalDomainConstraint ( VariationalConstraint ): """ Simple Variational Domain Constraint with a single geometry that represents the domain. TODO add comprehensive doc string after refactor """ def __init__ ( self , nodes : List [ Node ], geometry : Geometry , outvar_names : List [ str ], boundary_batch_size : int , interior_batch_size : int , interior_bounds : Dict [ sp . Basic , Tuple [ float , float ]] = None , boundary_criteria : Union [ sp . Basic , Callable , None ] = None , interior_criteria : Union [ sp . Basic , Callable , None ] = None , parameterization : Union [ Parameterization , None ] = None , batch_per_epoch : int = 1000 , quasirandom : bool = False , num_workers : int = 0 , loss : Loss = PointwiseLossNorm (), shuffle : bool = True , ): # sample boundary invar = geometry . sample_boundary ( boundary_batch_size * batch_per_epoch , criteria = boundary_criteria , parameterization = parameterization , quasirandom = quasirandom , ) invar [ "area" ] *= batch_per_epoch # make variational boundary dataset dataset_boundary = DictVariationalDataset ( invar = invar , outvar_names = outvar_names , ) # sample interior invar = geometry . sample_interior ( interior_batch_size * batch_per_epoch , bounds = interior_bounds , criteria = interior_criteria , parameterization = parameterization , quasirandom = quasirandom , ) invar [ "area" ] *= batch_per_epoch # make variational interior dataset dataset_interior = DictVariationalDataset ( invar = invar , outvar_names = outvar_names , ) datasets = { "boundary" : dataset_boundary , "interior" : dataset_interior } batch_sizes = { "boundary" : boundary_batch_size , "interior" : interior_batch_size } # initialize constraint super () . __init__ ( nodes = nodes , datasets = datasets , batch_sizes = batch_sizes , loss = loss , shuffle = shuffle , drop_last = True , num_workers = num_workers , ) [docs] class DeepONetConstraint ( PointwiseConstraint ): """ Base DeepONet Constraint class for all DeepONets """ def save_batch ( self , filename ): # sample batch invar , true_outvar , lambda_weighting = next ( self . dataloader ) invar = Constraint . _set_device ( invar , device = self . device , requires_grad = True ) true_outvar = Constraint . _set_device ( true_outvar , device = self . device ) lambda_weighting = Constraint . _set_device ( lambda_weighting , device = self . device ) # If using DDP, strip out collective stuff to prevent deadlocks # This only works either when one process alone calls in to save_batch # or when multiple processes independently save data if hasattr ( self . model , "module" ): modl = self . model . module else : modl = self . model # compute pred outvar pred_outvar = modl ( invar ) # rename values and save batch to vtk file TODO clean this up after graph unroll stuff named_lambda_weighting = { "lambda_" + key : value for key , value in lambda_weighting . items () } named_true_outvar = { "true_" + key : value for key , value in true_outvar . items ()} named_pred_outvar = { "pred_" + key : value for key , value in pred_outvar . items ()} save_var = { ** { key : value for key , value in invar . items ()}, ** named_true_outvar , ** named_pred_outvar , ** named_lambda_weighting , } save_var = { key : value . cpu () . detach () . numpy () for key , value in save_var . items () } np . savez_compressed ( filename + ".npz" , ** save_var ) [docs] @classmethod def from_numpy ( cls , nodes : List [ Node ], invar : Dict [ str , np . ndarray ], outvar : Dict [ str , np . ndarray ], batch_size : int , lambda_weighting : Dict [ str , np . ndarray ] = None , loss : Loss = PointwiseLossNorm (), shuffle : bool = True , drop_last : bool = True , num_workers : int = 0 , ): """ Create custom DeepONet constraint from numpy arrays. Parameters ---------- nodes : List[Node] List of Modulus Nodes to unroll graph with. invar : Dict[str, np.ndarray (N, 1)] Dictionary of numpy arrays as input. outvar : Dict[str, np.ndarray (N, 1)] Dictionary of numpy arrays to enforce constraint on. batch_size : int Batch size used in training. lambda_weighting : Dict[str, np.ndarray (N, 1)] Dictionary of numpy arrays to pointwise weight losses. Default is ones. loss : Loss Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...). shuffle : bool, optional Randomly shuffle examples in dataset every epoch, by default True drop_last : bool, optional Drop last mini-batch if dataset not fully divisible but batch_size, by default False num_workers : int Number of worker used in fetching data. """ # make point dataset dataset = DictPointwiseDataset ( invar = invar , outvar = outvar , lambda_weighting = lambda_weighting , ) return cls ( nodes = nodes , dataset = dataset , loss = loss , batch_size = batch_size , shuffle = shuffle , drop_last = drop_last , num_workers = num_workers , )