# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Continuous type constraints
"""
import torch
from torch.nn.parallel import DistributedDataParallel
import numpy as np
from typing import Dict, List, Union, Tuple, Callable
import sympy as sp
import logging
import torch
from .constraint import Constraint
from .utils import _compute_outvar, _compute_lambda_weighting
from modulus.sym.utils.io.vtk import var_to_polyvtk
from modulus.sym.graph import Graph
from modulus.sym.key import Key
from modulus.sym.node import Node
from modulus.sym.loss import Loss, PointwiseLossNorm, IntegralLossNorm
from modulus.sym.distributed import DistributedManager
from modulus.sym.utils.sympy import np_lambdify
from modulus.sym.geometry import Geometry
from modulus.sym.geometry.helper import _sympy_criteria_to_criteria
from modulus.sym.geometry.parameterization import Parameterization, Bounds
from modulus.sym.dataset import (
DictPointwiseDataset,
ListIntegralDataset,
ContinuousPointwiseIterableDataset,
ContinuousIntegralIterableDataset,
DictImportanceSampledPointwiseIterableDataset,
DictVariationalDataset,
)
Tensor = torch.Tensor
logger = logging.getLogger(__name__)
[docs]class PointwiseConstraint(Constraint):
"""
Base class for all Pointwise Constraints
"""
def save_batch(self, filename):
# sample batch
invar, true_outvar, lambda_weighting = next(self.dataloader)
invar = Constraint._set_device(invar, device=self.device, requires_grad=True)
true_outvar = Constraint._set_device(true_outvar, device=self.device)
lambda_weighting = Constraint._set_device(lambda_weighting, device=self.device)
# If using DDP, strip out collective stuff to prevent deadlocks
# This only works either when one process alone calls in to save_batch
# or when multiple processes independently save data
if hasattr(self.model, "module"):
modl = self.model.module
else:
modl = self.model
# compute pred outvar
pred_outvar = modl(invar)
# rename values and save batch to vtk file TODO clean this up after graph unroll stuff
named_lambda_weighting = {
"lambda_" + key: value for key, value in lambda_weighting.items()
}
named_true_outvar = {"true_" + key: value for key, value in true_outvar.items()}
named_pred_outvar = {"pred_" + key: value for key, value in pred_outvar.items()}
save_var = {
**{key: value for key, value in invar.items()},
**named_true_outvar,
**named_pred_outvar,
**named_lambda_weighting,
}
save_var = {
key: value.cpu().detach().numpy() for key, value in save_var.items()
}
var_to_polyvtk(save_var, filename)
def load_data(self):
# get train points from dataloader
invar, true_outvar, lambda_weighting = next(self.dataloader)
self._input_vars = Constraint._set_device(
invar, device=self.device, requires_grad=True
)
self._target_vars = Constraint._set_device(true_outvar, device=self.device)
self._lambda_weighting = Constraint._set_device(
lambda_weighting, device=self.device
)
def load_data_static(self):
if self._input_vars is None:
# Default loading if vars not allocated
self.load_data()
else:
# get train points from dataloader
invar, true_outvar, lambda_weighting = next(self.dataloader)
# Set grads to false here for inputs, static var has allocation already
input_vars = Constraint._set_device(
invar, device=self.device, requires_grad=False
)
target_vars = Constraint._set_device(true_outvar, device=self.device)
lambda_weighting = Constraint._set_device(
lambda_weighting, device=self.device
)
for key in input_vars.keys():
self._input_vars[key].data.copy_(input_vars[key])
for key in target_vars.keys():
self._target_vars[key].copy_(target_vars[key])
for key in lambda_weighting.keys():
self._lambda_weighting[key].copy_(lambda_weighting[key])
def forward(self):
# compute pred outvar
self._output_vars = self.model(self._input_vars)
def loss(self, step: int) -> Dict[str, torch.Tensor]:
if self._output_vars is None:
logger.warn("Calling loss without forward call")
return {}
losses = self._loss(
self._input_vars,
self._output_vars,
self._target_vars,
self._lambda_weighting,
step,
)
return losses
[docs] @classmethod
def from_numpy(
cls,
nodes: List[Node],
invar: Dict[str, np.ndarray],
outvar: Dict[str, np.ndarray],
batch_size: int,
lambda_weighting: Dict[str, np.ndarray] = None,
loss: Loss = PointwiseLossNorm(),
shuffle: bool = True,
drop_last: bool = True,
num_workers: int = 0,
):
"""
Create custom pointwise constraint from numpy arrays.
Parameters
----------
nodes : List[Node]
List of Modulus Nodes to unroll graph with.
invar : Dict[str, np.ndarray (N, 1)]
Dictionary of numpy arrays as input.
outvar : Dict[str, np.ndarray (N, 1)]
Dictionary of numpy arrays to enforce constraint on.
batch_size : int
Batch size used in training.
lambda_weighting : Dict[str, np.ndarray (N, 1)]
Dictionary of numpy arrays to pointwise weight losses.
Default is ones.
loss : Loss
Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...).
shuffle : bool, optional
Randomly shuffle examples in dataset every epoch, by default True
drop_last : bool, optional
Drop last mini-batch if dataset not fully divisible but batch_size, by default False
num_workers : int
Number of worker used in fetching data.
"""
if "area" not in invar:
invar["area"] = np.ones_like(next(iter(invar.values())))
# TODO: better area definition?
# no need to lambdify: outvar / lambda_weighting already contain np arrays
# make point dataset
dataset = DictPointwiseDataset(
invar=invar,
outvar=outvar,
lambda_weighting=lambda_weighting,
)
return cls(
nodes=nodes,
dataset=dataset,
loss=loss,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last,
num_workers=num_workers,
)
[docs]class PointwiseBoundaryConstraint(PointwiseConstraint):
"""
Pointwise Constraint applied to boundary/perimeter/surface of geometry.
For example, in 3D this will create a constraint on the surface of the
given geometry.
Parameters
----------
nodes : List[Node]
List of Modulus Nodes to unroll graph with.
geometry : Geometry
Modulus `Geometry` to apply the constraint with.
outvar : Dict[str, Union[int, float, sp.Basic]]
A dictionary of SymPy Symbols/Expr, floats or int.
This is used to describe the constraint. For example,
`outvar={'u': 0}` would specify `'u'` to be zero everywhere
on the constraint.
batch_size : int
Batch size used in training.
criteria : Union[sp.Basic, True]
SymPy criteria function specifies to only apply constraint to areas
that satisfy this criteria. For example, if
`criteria=sympy.Symbol('x')>0` then only areas that have positive
`'x'` values will have the constraint applied to them.
lambda_weighting : Dict[str, Union[int, float, sp.Basic]] = None
The spatial pointwise weighting of the constraint. For example,
`lambda_weighting={'lambda_u': 2.0*sympy.Symbol('x')}` would
apply a pointwise weighting to the loss of `2.0 * x`.
parameterization : Union[Parameterization, None], optional
This allows adding parameterization or additional inputs.
fixed_dataset : bool = True
If True then the points sampled for this constraint are done right
when initialized and fixed. If false then the points are continually
resampled.
compute_sdf_derivatives: bool, optional
Compute SDF derivatives when sampling geometery
importance_measure : Union[Callable, None] = None
A callable function that computes a scalar importance measure. This
importance measure is then used in the constraint when sampling
points. Areas with higher importance are sampled more frequently
according to Monte Carlo importance sampling,
https://en.wikipedia.org/wiki/Monte_Carlo_integration.
batch_per_epoch : int = 1000
If `fixed_dataset=True` then the total number of points generated
to apply constraint on is `total_nr_points=batch_per_epoch*batch_size`.
quasirandom : bool = False
If true then sample the points using the Halton sequence.
num_workers : int
Number of worker used in fetching data.
loss : Loss
Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...).
shuffle : bool, optional
Randomly shuffle examples in dataset every epoch, by default True
"""
def __init__(
self,
nodes: List[Node],
geometry: Geometry,
outvar: Dict[str, Union[int, float, sp.Basic]],
batch_size: int,
criteria: Union[sp.Basic, Callable, None] = None,
lambda_weighting: Dict[str, Union[int, float, sp.Basic]] = None,
parameterization: Union[Parameterization, None] = None,
fixed_dataset: bool = True,
importance_measure: Union[Callable, None] = None,
batch_per_epoch: int = 1000,
quasirandom: bool = False,
num_workers: int = 0,
loss: Loss = PointwiseLossNorm(),
shuffle: bool = True,
):
# assert that not using importance measure with continuous dataset
assert not (
(not fixed_dataset) and (importance_measure is not None)
), "Using Importance measure with continuous dataset is not supported"
# if fixed dataset then sample points and fix for all of training
if fixed_dataset:
# sample boundary
invar = geometry.sample_boundary(
batch_size * batch_per_epoch,
criteria=criteria,
parameterization=parameterization,
quasirandom=quasirandom,
)
# compute outvar
outvar = _compute_outvar(invar, outvar)
# set lambda weighting
lambda_weighting = _compute_lambda_weighting(
invar, outvar, lambda_weighting
)
# make point dataset
if importance_measure is None:
invar["area"] *= batch_per_epoch # TODO find better way to do this
dataset = DictPointwiseDataset(
invar=invar,
outvar=outvar,
lambda_weighting=lambda_weighting,
)
else:
dataset = DictImportanceSampledPointwiseIterableDataset(
invar=invar,
outvar=outvar,
batch_size=batch_size,
importance_measure=importance_measure,
lambda_weighting=lambda_weighting,
shuffle=shuffle,
)
# else sample points every batch
else:
# invar function
invar_fn = lambda: geometry.sample_boundary(
batch_size,
criteria=criteria,
parameterization=parameterization,
quasirandom=quasirandom,
)
# outvar function
outvar_fn = lambda invar: _compute_outvar(invar, outvar)
# lambda weighting function
lambda_weighting_fn = lambda invar, outvar: _compute_lambda_weighting(
invar, outvar, lambda_weighting
)
# make point dataloader
dataset = ContinuousPointwiseIterableDataset(
invar_fn=invar_fn,
outvar_fn=outvar_fn,
lambda_weighting_fn=lambda_weighting_fn,
)
# initialize constraint
super().__init__(
nodes=nodes,
dataset=dataset,
loss=loss,
batch_size=batch_size,
shuffle=shuffle,
drop_last=True,
num_workers=num_workers,
)
[docs]class PointwiseInteriorConstraint(PointwiseConstraint):
"""
Pointwise Constraint applied to interior of geometry.
For example, in 3D this will create a constraint on the interior
volume of the given geometry.
Parameters
----------
nodes : List[Node]
List of Modulus Nodes to unroll graph with.
geometry : Geometry
Modulus `Geometry` to apply the constraint with.
outvar : Dict[str, Union[int, float, sp.Basic]]
A dictionary of SymPy Symbols/Expr, floats or int.
This is used to describe the constraint. For example,
`outvar={'u': 0}` would specify `'u'` to be zero everywhere
in the constraint.
batch_size : int
Batch size used in training.
bounds : Dict[sp.Basic, Tuple[float, float]] = None
Bounds of the given geometry,
(e.g. `bounds={sympy.Symbol('x'): (0, 1), sympy.Symbol('y'): (0, 1)}).
criteria : Union[sp.basic, True]
SymPy criteria function specifies to only apply constraint to areas
that satisfy this criteria. For example, if
`criteria=sympy.Symbol('x')>0` then only areas that have positive
`'x'` values will have the constraint applied to them.
lambda_weighting : Dict[str, Union[int, float, sp.Basic]] = None
The spatial pointwise weighting of the constraint. For example,
`lambda_weighting={'lambda_u': 2.0*sympy.Symbol('x')}` would
apply a pointwise weighting to the loss of `2.0 * x`.
parameterization: Union[Parameterization, None] = {}
This allows adding parameterization or additional inputs.
fixed_dataset : bool = True
If True then the points sampled for this constraint are done right
when initialized and fixed. If false then the points are continually
resampled.
importance_measure : Union[Callable, None] = None
A callable function that computes a scalar importance measure. This
importance measure is then used in the constraint when sampling
points. Areas with higher importance are sampled more frequently
according to Monte Carlo importance sampling,
https://en.wikipedia.org/wiki/Monte_Carlo_integration.
batch_per_epoch : int = 1000
If `fixed_dataset=True` then the total number of points generated
to apply constraint on is `total_nr_points=batch_per_epoch*batch_size`.
quasirandom : bool = False
If true then sample the points using the Halton sequence.
num_workers : int
Number of worker used in fetching data.
loss : Loss
Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...).
shuffle : bool, optional
Randomly shuffle examples in dataset every epoch, by default True
"""
def __init__(
self,
nodes: List[Node],
geometry: Geometry,
outvar: Dict[str, Union[int, float, sp.Basic]],
batch_size: int,
bounds: Dict[sp.Basic, Tuple[float, float]] = None,
criteria: Union[sp.Basic, Callable, None] = None,
lambda_weighting: Dict[str, Union[int, float, sp.Basic]] = None,
parameterization: Union[Parameterization, None] = None,
fixed_dataset: bool = True,
compute_sdf_derivatives: bool = False,
importance_measure: Union[Callable, None] = None,
batch_per_epoch: int = 1000,
quasirandom: bool = False,
num_workers: int = 0,
loss: Loss = PointwiseLossNorm(),
shuffle: bool = True,
):
# assert that not using importance measure with continuous dataset
assert not (
(not fixed_dataset) and (importance_measure is not None)
), "Using Importance measure with continuous dataset is not supported"
# if fixed dataset then sample points and fix for all of training
if fixed_dataset:
# sample interior
invar = geometry.sample_interior(
batch_size * batch_per_epoch,
bounds=bounds,
criteria=criteria,
parameterization=parameterization,
quasirandom=quasirandom,
compute_sdf_derivatives=compute_sdf_derivatives,
)
# compute outvar
outvar = _compute_outvar(invar, outvar)
# set lambda weighting
lambda_weighting = _compute_lambda_weighting(
invar, outvar, lambda_weighting
)
# make point dataset
if importance_measure is None:
invar["area"] *= batch_per_epoch # TODO find better way to do this
dataset = DictPointwiseDataset(
invar=invar,
outvar=outvar,
lambda_weighting=lambda_weighting,
)
else:
dataset = DictImportanceSampledPointwiseIterableDataset(
invar=invar,
outvar=outvar,
batch_size=batch_size,
importance_measure=importance_measure,
lambda_weighting=lambda_weighting,
shuffle=shuffle,
)
# else sample points every batch
else:
# invar function
invar_fn = lambda: geometry.sample_interior(
batch_size,
bounds=bounds,
criteria=criteria,
parameterization=parameterization,
quasirandom=quasirandom,
compute_sdf_derivatives=compute_sdf_derivatives,
)
# outvar function
outvar_fn = lambda invar: _compute_outvar(invar, outvar)
# lambda weighting function
lambda_weighting_fn = lambda invar, outvar: _compute_lambda_weighting(
invar, outvar, lambda_weighting
)
# make point dataloader
dataset = ContinuousPointwiseIterableDataset(
invar_fn=invar_fn,
outvar_fn=outvar_fn,
lambda_weighting_fn=lambda_weighting_fn,
)
# initialize constraint
super().__init__(
nodes=nodes,
dataset=dataset,
loss=loss,
batch_size=batch_size,
shuffle=shuffle,
drop_last=True,
num_workers=num_workers,
)
[docs]class IntegralConstraint(Constraint):
"""
Base class for all Integral Constraints
"""
def save_batch(self, filename):
pass
# sample batch
invar, true_outvar, lambda_weighting = next(self.dataloader)
invar = Constraint._set_device(invar, device=self.device, requires_grad=True)
# rename values and save batch to vtk file TODO clean this up after graph unroll stuff
for i in range(self.batch_size):
save_var = {
key: value[i].cpu().detach().numpy() for key, value in invar.items()
}
var_to_polyvtk(save_var, filename + "_batch_" + str(i))
def load_data(self):
# get train points from dataloader
invar, true_outvar, lambda_weighting = next(self.dataloader)
self._input_vars = Constraint._set_device(
invar, device=self.device, requires_grad=True
)
self._target_vars = Constraint._set_device(true_outvar, device=self.device)
self._lambda_weighting = Constraint._set_device(
lambda_weighting, device=self.device
)
def load_data_static(self):
if self._input_vars is None:
# Default loading if vars not allocated
self.load_data()
else:
# get train points from dataloader
invar, true_outvar, lambda_weighting = next(self.dataloader)
# Set grads to false here for inputs, static var has allocation already
input_vars = Constraint._set_device(
invar, device=self.device, requires_grad=False
)
target_vars = Constraint._set_device(true_outvar, device=self.device)
lambda_weighting = Constraint._set_device(
lambda_weighting, device=self.device
)
for key in input_vars.keys():
self._input_vars[key].data.copy_(input_vars[key])
for key in target_vars.keys():
self._target_vars[key].copy_(target_vars[key])
for key in lambda_weighting.keys():
self._lambda_weighting[key].copy_(lambda_weighting[key])
@property
def output_vars(self) -> Dict[str, Tensor]:
return self._output_vars
@output_vars.setter
def output_vars(self, data: Dict[str, Tensor]):
self._output_vars = {}
for output in self.output_names:
self._output_vars[str(output)] = data[str(output)]
def forward(self):
# compute pred outvar
self._output_vars = self.model(self._input_vars)
def loss(self, step: int) -> Dict[str, torch.Tensor]:
if self._output_vars is None:
logger.warn("Calling loss without forward call")
return {}
# split for individual integration
list_invar, list_pred_outvar, list_true_outvar, list_lambda_weighting = (
[],
[],
[],
[],
)
for i in range(self.batch_size):
list_invar.append(
{key: value[i] for key, value in self._input_vars.items()}
)
list_pred_outvar.append(
{key: value[i] for key, value in self._output_vars.items()}
)
list_true_outvar.append(
{key: value[i] for key, value in self._target_vars.items()}
)
list_lambda_weighting.append(
{key: value[i] for key, value in self._lambda_weighting.items()}
)
# compute integral losses
losses = self._loss(
list_invar, list_pred_outvar, list_true_outvar, list_lambda_weighting, step
)
return losses
[docs]class IntegralBoundaryConstraint(IntegralConstraint):
"""
Integral Constraint applied to boundary/perimeter/surface of geometry.
For example, in 3D this will create a constraint on the surface of the
given geometry.
Parameters
----------
nodes : List[Node]
List of Modulus Nodes to unroll graph with.
geometry : Geometry
Modulus `Geometry` to apply the constraint with.
outvar : Dict[str, Union[int, float, sp.Basic]]
A dictionary of SymPy Symbols/Expr, floats or int.
This is used to describe the constraint. For example,
`outvar={'u': 0}` would specify the integral of `'u'`
to be zero.
batch_size : int
Number of integrals to apply.
integral_batch_size : int
Batch sized used in the Monte Carlo integration to compute
the integral.
criteria : Union[sp.basic, True]
SymPy criteria function specifies to only integrate areas
that satisfy this criteria. For example, if
`criteria=sympy.Symbol('x')>0` then only areas that have positive
`'x'` values will be integrated.
lambda_weighting : Dict[str, Union[int, float, sp.Basic]] = None
The weighting of the constraint. For example,
`lambda_weighting={'lambda_u': 2.0}` would
weight the integral constraint by `2.0`.
parameterization : Union[Parameterization, None]
This allows adding parameterization or additional inputs.
fixed_dataset : bool = True
If True then the points sampled for this constraint are done right
when initialized and fixed. If false then the points are continually
resampled.
batch_per_epoch : int = 100
If `fixed_dataset=True` then the total number of integrals generated
to apply constraint on is `total_nr_integrals=batch_per_epoch*batch_size`.
quasirandom : bool = False
If true then sample the points using the Halton sequence.
num_workers : int
Number of worker used in fetching data.
loss : Loss
Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...).
shuffle : bool, optional
Randomly shuffle examples in dataset every epoch, by default True
"""
def __init__(
self,
nodes: List[Node],
geometry: Geometry,
outvar: Dict[str, Union[int, float, sp.Basic]],
batch_size: int,
integral_batch_size: int,
criteria: Union[sp.Basic, Callable, None] = None,
lambda_weighting: Dict[str, Union[int, float, sp.Basic]] = None,
parameterization: Union[Parameterization, None] = None,
fixed_dataset: bool = True,
batch_per_epoch: int = 100,
quasirandom: bool = False,
num_workers: int = 0,
loss: Loss = IntegralLossNorm(),
shuffle: bool = True,
):
# convert dict to parameterization if needed
if parameterization is None:
parameterization = geometry.parameterization
elif isinstance(parameterization, dict):
parameterization = Parameterization(parameterization)
# Fixed number of integral examples
if fixed_dataset:
# sample geometry to generate integral batchs
list_invar = []
list_outvar = []
list_lambda_weighting = []
for i in range(batch_size * batch_per_epoch):
# sample parameter ranges
if parameterization:
specific_param_ranges = parameterization.sample(1)
else:
specific_param_ranges = {}
# sample boundary
invar = geometry.sample_boundary(
integral_batch_size,
criteria=criteria,
parameterization=Parameterization(
{
sp.Symbol(key): float(value)
for key, value in specific_param_ranges.items()
}
),
quasirandom=quasirandom,
)
# compute outvar
if (
not specific_param_ranges
): # TODO this can be removed after a np_lambdify rewrite
specific_param_ranges = {"_": next(iter(invar.values()))[0:1]}
outvar_star = _compute_outvar(specific_param_ranges, outvar)
# set lambda weighting
lambda_weighting_star = _compute_lambda_weighting(
specific_param_ranges, outvar, lambda_weighting
)
# store samples
list_invar.append(invar)
list_outvar.append(outvar_star)
list_lambda_weighting.append(lambda_weighting_star)
# make dataset of integral planes
dataset = ListIntegralDataset(
list_invar=list_invar,
list_outvar=list_outvar,
list_lambda_weighting=list_lambda_weighting,
)
# Continuous sampling
else:
# sample parameter ranges
if parameterization:
param_ranges_fn = lambda: parameterization.sample(1)
else:
param_ranges_fn = lambda: {}
# invar function
invar_fn = lambda param_range: geometry.sample_boundary(
integral_batch_size,
criteria=criteria,
parameterization=Parameterization(
{sp.Symbol(key): float(value) for key, value in param_range.items()}
),
quasirandom=quasirandom,
)
# outvar function
outvar_fn = lambda param_range: _compute_outvar(param_range, outvar)
# lambda weighting function
lambda_weighting_fn = lambda param_range, outvar: _compute_lambda_weighting(
param_range, outvar, lambda_weighting
)
# make dataset of integral planes
dataset = ContinuousIntegralIterableDataset(
invar_fn=invar_fn,
outvar_fn=outvar_fn,
batch_size=batch_size,
lambda_weighting_fn=lambda_weighting_fn,
param_ranges_fn=param_ranges_fn,
)
self.batch_size = batch_size
# initialize constraint
super().__init__(
nodes=nodes,
dataset=dataset,
loss=loss,
batch_size=batch_size,
shuffle=shuffle,
drop_last=True,
num_workers=num_workers,
)
[docs]class VariationalConstraint(Constraint):
"""
Base class for all Variational Constraints.
B(u, v, g, dom) = \\int_{dom} (F(u, v) - g*v) dx = 0,
where F is an operator, g is a given function/data,
v is the test function.
loss of variational = B1(u1, v1, g1, dom1) + B2(u2, v2, g2, dom2) + ...
"""
def __init__(
self,
nodes: List[Node],
datasets: Dict[str, DictVariationalDataset],
batch_sizes: Dict[str, int],
loss: Loss = PointwiseLossNorm(),
shuffle: bool = True,
drop_last: bool = True,
num_workers: int = 0,
):
# Get DDP manager
self.manager = DistributedManager()
self.device = self.manager.device
if not drop_last and self.manager.cuda_graphs:
logger.info("drop_last must be true when using cuda graphs")
drop_last = True
# make dataloader from dataset
self.data_loaders = {}
invar_keys = []
outvar_keys = []
for name in datasets:
self.data_loaders[name] = iter(
Constraint.get_dataloader(
dataset=datasets[name],
batch_size=batch_sizes[name],
shuffle=shuffle,
drop_last=drop_last,
num_workers=num_workers,
)
)
invar_keys = invar_keys + datasets[name].invar_keys
outvar_keys = outvar_keys + datasets[name].outvar_keys
# construct model from nodes
self.model = Graph(
nodes,
Key.convert_list(list(set(invar_keys))),
Key.convert_list(list(set(outvar_keys))),
)
self.manager = DistributedManager()
self.device = self.manager.device
self.model.to(self.device)
if self.manager.distributed:
# https://pytorch.org/docs/master/notes/cuda.html#id5
s = torch.cuda.Stream()
s.wait_stream(torch.cuda.current_stream())
with torch.cuda.stream(s):
self.model = DistributedDataParallel(
self.model,
device_ids=[self.manager.local_rank],
output_device=self.device,
broadcast_buffers=self.manager.broadcast_buffers,
find_unused_parameters=self.manager.find_unused_parameters,
process_group=self.manager.group(
"data_parallel"
), # None by default
)
torch.cuda.current_stream().wait_stream(s)
self._input_names = Key.convert_list(list(set(invar_keys)))
self._output_names = Key.convert_list(list(set(outvar_keys)))
self._input_vars = None
self._target_vars = None
self._lambda_weighting = None
# put loss on device
self._loss = loss.to(self.device)
def save_batch(self, filename):
# sample batch
for name, data_loader in self.data_loaders.items():
invar = Constraint._set_device(
next(data_loader), device=self.device, requires_grad=True
)
# If using DDP, strip out collective stuff to prevent deadlocks
# This only works either when one process alone calls in to save_batch
# or when multiple processes independently save data
if hasattr(self.model, "module"):
modl = self.model.module
else:
modl = self.model
# compute pred outvar
outvar = modl(invar)
named_outvar = {
"pred_" + key: value.cpu().detach().numpy()
for key, value in outvar.items()
}
save_var = {
**{key: value.cpu().detach().numpy() for key, value in invar.items()},
**named_outvar,
}
var_to_polyvtk(save_var, filename + "_" + name)
def load_data(self):
self._input_vars = {}
self._output_vars = {}
for name, data_loader in self.data_loaders.items():
# get train points from dataloader
invar = next(data_loader)
self._input_vars[name] = Constraint._set_device(
invar, device=self.device, requires_grad=True
)
def load_data_static(self):
if self._input_vars is None:
# Default loading if vars not allocated
self.load_data()
else:
for name, data_loader in self.data_loaders.items():
# get train points from dataloader
invar = next(data_loader)
# Set grads to false here for inputs, static var has allocation already
input_vars = Constraint._set_device(
invar, device=self.device, requires_grad=False
)
for key in input_vars.keys():
self._input_vars[name][key].data.copy_(input_vars[key])
self._input_vars[name] = Constraint._set_device(
invar, device=self.device, requires_grad=True
)
def forward(self):
# compute pred outvar
for name in self._input_vars.keys():
self._output_vars[name] = self.model(self._input_vars[name])
def loss(self, step):
# compute loss
losses = self._loss(
list(self._input_vars.values()), list(self._output_vars.values()), step
)
return losses
[docs]class VariationalDomainConstraint(VariationalConstraint):
"""
Simple Variational Domain Constraint with a single geometry
that represents the domain.
TODO add comprehensive doc string after refactor
"""
def __init__(
self,
nodes: List[Node],
geometry: Geometry,
outvar_names: List[str],
boundary_batch_size: int,
interior_batch_size: int,
interior_bounds: Dict[sp.Basic, Tuple[float, float]] = None,
boundary_criteria: Union[sp.Basic, Callable, None] = None,
interior_criteria: Union[sp.Basic, Callable, None] = None,
parameterization: Union[Parameterization, None] = None,
batch_per_epoch: int = 1000,
quasirandom: bool = False,
num_workers: int = 0,
loss: Loss = PointwiseLossNorm(),
shuffle: bool = True,
):
# sample boundary
invar = geometry.sample_boundary(
boundary_batch_size * batch_per_epoch,
criteria=boundary_criteria,
parameterization=parameterization,
quasirandom=quasirandom,
)
invar["area"] *= batch_per_epoch
# make variational boundary dataset
dataset_boundary = DictVariationalDataset(
invar=invar,
outvar_names=outvar_names,
)
# sample interior
invar = geometry.sample_interior(
interior_batch_size * batch_per_epoch,
bounds=interior_bounds,
criteria=interior_criteria,
parameterization=parameterization,
quasirandom=quasirandom,
)
invar["area"] *= batch_per_epoch
# make variational interior dataset
dataset_interior = DictVariationalDataset(
invar=invar,
outvar_names=outvar_names,
)
datasets = {"boundary": dataset_boundary, "interior": dataset_interior}
batch_sizes = {"boundary": boundary_batch_size, "interior": interior_batch_size}
# initialize constraint
super().__init__(
nodes=nodes,
datasets=datasets,
batch_sizes=batch_sizes,
loss=loss,
shuffle=shuffle,
drop_last=True,
num_workers=num_workers,
)
[docs]class DeepONetConstraint(PointwiseConstraint):
"""
Base DeepONet Constraint class for all DeepONets
"""
def save_batch(self, filename):
# sample batch
invar, true_outvar, lambda_weighting = next(self.dataloader)
invar = Constraint._set_device(invar, device=self.device, requires_grad=True)
true_outvar = Constraint._set_device(true_outvar, device=self.device)
lambda_weighting = Constraint._set_device(lambda_weighting, device=self.device)
# If using DDP, strip out collective stuff to prevent deadlocks
# This only works either when one process alone calls in to save_batch
# or when multiple processes independently save data
if hasattr(self.model, "module"):
modl = self.model.module
else:
modl = self.model
# compute pred outvar
pred_outvar = modl(invar)
# rename values and save batch to vtk file TODO clean this up after graph unroll stuff
named_lambda_weighting = {
"lambda_" + key: value for key, value in lambda_weighting.items()
}
named_true_outvar = {"true_" + key: value for key, value in true_outvar.items()}
named_pred_outvar = {"pred_" + key: value for key, value in pred_outvar.items()}
save_var = {
**{key: value for key, value in invar.items()},
**named_true_outvar,
**named_pred_outvar,
**named_lambda_weighting,
}
save_var = {
key: value.cpu().detach().numpy() for key, value in save_var.items()
}
np.savez_compressed(filename + ".npz", **save_var)
[docs] @classmethod
def from_numpy(
cls,
nodes: List[Node],
invar: Dict[str, np.ndarray],
outvar: Dict[str, np.ndarray],
batch_size: int,
lambda_weighting: Dict[str, np.ndarray] = None,
loss: Loss = PointwiseLossNorm(),
shuffle: bool = True,
drop_last: bool = True,
num_workers: int = 0,
):
"""
Create custom DeepONet constraint from numpy arrays.
Parameters
----------
nodes : List[Node]
List of Modulus Nodes to unroll graph with.
invar : Dict[str, np.ndarray (N, 1)]
Dictionary of numpy arrays as input.
outvar : Dict[str, np.ndarray (N, 1)]
Dictionary of numpy arrays to enforce constraint on.
batch_size : int
Batch size used in training.
lambda_weighting : Dict[str, np.ndarray (N, 1)]
Dictionary of numpy arrays to pointwise weight losses.
Default is ones.
loss : Loss
Modulus `Loss` module that defines the loss type, (e.g. L2, L1, ...).
shuffle : bool, optional
Randomly shuffle examples in dataset every epoch, by default True
drop_last : bool, optional
Drop last mini-batch if dataset not fully divisible but batch_size, by default False
num_workers : int
Number of worker used in fetching data.
"""
# make point dataset
dataset = DictPointwiseDataset(
invar=invar,
outvar=outvar,
lambda_weighting=lambda_weighting,
)
return cls(
nodes=nodes,
dataset=dataset,
loss=loss,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last,
num_workers=num_workers,
)