NVIDIA Modulus Sym v1.0.0
Sym v1.0.0

deeplearning/modulus/modulus-sym-v100/_modules/modulus/sym/geometry/parameterization.html

Source code for modulus.sym.geometry.parameterization

# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import numpy as np
from typing import Dict, List, Union, Tuple, Callable, Optional
import sympy
from typing import Callable
from chaospy.distributions.sampler.sequences.primes import create_primes
from chaospy.distributions.sampler.sequences.van_der_corput import (
    create_van_der_corput_samples as create_samples,
)

from modulus.sym.utils.sympy import np_lambdify


[docs]class Parameter(sympy.Symbol): """A Symbolic object used to parameterize geometries. Currently this only overloads the Sympy Symbol class however capabilities may be expanded in the future. Parameters ---------- name : str Name given to parameter. """ def __new__(cls, name: str): obj = sympy.Symbol.__new__(cls, name) return obj
[docs]class Parameterization: """A object used to store parameterization information about geometries. Parameters ---------- param_ranges : Dict[Parameter, Union[float, Tuple[float, float], np.ndarray (N, 1)] Dictionary of Parameters and their ranges. The ranges can be one of the following types, :obj: Float will sample the parameter equal to this value. :obj: Tuple of two float as the bounding range to sample parameter from. :obj: `np.ndarray` as a discrete list of possible values for the parameter. """ def __init__( self, param_ranges: Dict[ Parameter, Union[float, Tuple[float, float], np.ndarray] ] = {}, ): # store param ranges self.param_ranges = param_ranges @property def parameters(self): return [str(x) for x in self.param_ranges.keys()]
[docs] def sample(self, nr_points: int, quasirandom: bool = False): """Sample parameterization values. Parameters ---------- nr_points : int Number of points sampled from parameterization. quasirandom : bool If true then sample the points using Halton sequences. Default is False. """ return { str(key): value for key, value in _sample_ranges( nr_points, self.param_ranges, quasirandom ).items() }

def union(self, other): new_param_ranges = self.param_ranges.copy() for key, value in other.param_ranges.items(): new_param_ranges[key] = value return Parameterization(new_param_ranges) @classmethod def combine(cls, p1, p2): assert len(set(p1.parameters).intersection(set(p2.parameters))) == 0, ( "Combining parameterizations when they have overlapping parameters: p1 " + str(p1) + ", p2 " + str(p2) ) new_param_ranges = p1.param_ranges.copy() new_param_ranges.update(p2.param_ranges.copy()) return cls(new_param_ranges) def copy(self): return Parameterization(self.param_ranges.copy()) def __str__(self): return str(self.param_ranges)

[docs]class OrderedParameterization(Parameterization): """A object used to store ordered parameterization information about user-specified keys. Parameters ---------- param_ranges : Dict[Parameter, Union[float, Tuple[float, float], np.ndarray (N, 1)] Dictionary of Parameters and their ranges. The ranges can be one of the following types, :obj: Float will sample the parameter equal to this value. :obj: Tuple of two float as the bounding range to sample parameter from. :obj: `np.ndarray` as a discrete list of possible values for the parameter. """ def __init__(self, param_ranges, key): super().__init__(param_ranges) self.key = key
[docs] def sample( self, nr_points: int, quasirandom: bool = False, sort: Optional = "ascending" ): """Sample ordered parameterization values. Parameters ---------- nr_points : int Number of points sampled from parameterization. quasirandom : bool If true then sample the points using Halton sequences. Default is False. sort : None or {'ascending','descending'} If 'ascending' then sample the sorted points in ascending order. If 'descending' then sample the sorted points in descending order. Default is 'ascending'. """ sample_dict = {} for key, value in _sample_ranges( nr_points, self.param_ranges, quasirandom ).items(): # sort the samples for the given key if key == self.key: if sort == "ascending": value = np.sort(value, axis=0) elif sort == "descending": value = np.sort(value, axis=0)[::-1] else: raise ValueError( "Sort must be one of None, 'ascending', or 'descending' (got {})".format( str(sort) ) ) sample_dict[str(key)] = value return sample_dict
[docs]class Bounds: """A object used to store bounds for geometries. Parameters ---------- bound_ranges : Dict[Parameter, Tuple[Union[float, sympy.Basic], Union[float, sympy.Basic]] Dictionary of Parameters with names `"x"`, `"y"`, or `"z"`. The value given for each of these is a tuple of the lower and upper bound. Sympy expressions can be used to define these upper and lower bounds. parameterization : Parameterization A Parameterization object used when the upper and lower bounds are parameterized. """ def __init__( self, bound_ranges: Dict[ Parameter, Tuple[Union[float, sympy.Basic], Union[float, sympy.Basic]] ], parameterization: Parameterization = Parameterization(), ): # store internal parameterization self.parameterization = parameterization # store bounds self.bound_ranges = bound_ranges @property def dims(self): """ Returns ------- dims : list of strings output can be ['x'], ['x','y'], or ['x','y','z'] """ return [str(x) for x in self.bound_ranges.keys()]
[docs] def sample( self, nr_points: int, parameterization: Union[None, Parameterization] = None, quasirandom: bool = False, ): """Sample points in Bounds. Parameters ---------- nr_points : int Number of points sampled from parameterization. parameterization : Parameterization Given if sampling bounds with different parameterization then the internal one stored in Bounds. Default is to not use this. quasirandom : bool If true then sample the points using Halton sequences. Default is False. """ if parameterization is not None: parameterization = self.parameterization computed_bound_ranges = self._compute_bounds(parameterization) return { str(key): value for key, value in _sample_ranges( nr_points, computed_bound_ranges, quasirandom ).items() }
[docs] def volume(self, parameterization: Union[None, Parameterization] = None): """Compute volume of bounds. Parameters ---------- parameterization : Parameterization Given if sampling bounds with different parameterization then the internal one stored in Bounds. Default is to not use this. """ # compute bounds from parameterization computed_bound_ranges = self._compute_bounds(parameterization) return np.prod( [value[1] - value[0] for value in computed_bound_ranges.values()] )

def union(self, other): new_parameterization = self.parameterization.union(other.parameterization) new_bound_ranges = {} for (key, (lower_1, upper_1)), (lower_2, upper_2) in zip( self.bound_ranges.items(), other.bound_ranges.values() ): # compute new lower bound if isinstance(lower_1, sympy.Basic) or isinstance(lower_2, sympy.Basic): new_lower = sympy.Min(lower_1, lower_2) elif isinstance(lower_1, (float, int)): new_lower = min(lower_1, lower_2) # compute new upper bound if isinstance(upper_1, sympy.Basic) or isinstance(upper_2, sympy.Basic): new_upper = sympy.Max(upper_1, upper_2) elif isinstance(upper_1, (float, int)): new_upper = max(upper_1, upper_2) # add to list of bound ranges new_bound_ranges[key] = (new_lower, new_upper) return Bounds(new_bound_ranges, new_parameterization) def intersection(self, other): new_parameterization = self.parameterization.union(other.parameterization) new_bound_ranges = {} for (key, (lower_1, upper_1)), (lower_2, upper_2) in zip( self.bound_ranges.items(), other.bound_ranges.values() ): # compute new lower bound if isinstance(lower_1, sympy.Basic) or isinstance(lower_2, sympy.Basic): new_lower = sympy.Max(lower_1, lower_2) elif isinstance(lower_1, (float, int)): new_lower = max(lower_1, lower_2) # compute new upper bound if isinstance(upper_1, sympy.Basic) or isinstance(upper_2, sympy.Basic): new_upper = sympy.Min(upper_1, upper_2) elif isinstance(upper_1, (float, int)): new_upper = min(upper_1, upper_2) # add to list of bound ranges new_bound_ranges[key] = (new_lower, new_upper) return Bounds(new_bound_ranges, new_parameterization) def scale(self, x, parameterization=Parameterization()): scaled_bound_ranges = { key: (lower * x, upper * x) for key, (lower, upper) in self.bound_ranges.items() } return Bounds( scaled_bound_ranges, self.parameterization.union(parameterization) ) def translate(self, xyz, parameterization=Parameterization()): translated_bound_ranges = { key: (lower + x, upper + x) for (key, (lower, upper)), x in zip(self.bound_ranges.items(), xyz) } return Bounds( translated_bound_ranges, self.parameterization.union(parameterization) ) def rotate(self, angle, axis, parameterization=Parameterization()): # rotate bounding box rotated_dims = [Parameter(key) for key in self.dims if key != axis] bounding_points = itertools.product( *[value for value in self.bound_ranges.values()] ) rotated_bounding_points = [] for p in bounding_points: p = {Parameter(key): value for key, value in zip(self.dims, p)} rotated_p = {**p} rotated_p[rotated_dims[0]] = ( sympy.cos(angle) * p[rotated_dims[0]] - sympy.sin(angle) * p[rotated_dims[1]] ) rotated_p[rotated_dims[1]] = ( sympy.sin(angle) * p[rotated_dims[0]] + sympy.cos(angle) * p[rotated_dims[1]] ) rotated_bounding_points.append(rotated_p) # find new bounds from rotated bounds rotated_bound_ranges = {**self.bound_ranges} for d in self.dims: # find upper and lower bound a = [p[Parameter(d)] for p in rotated_bounding_points] lower = sympy.Min(*a) upper = sympy.Max(*a) if lower.is_number: lower = float(lower) if upper.is_number: upper = float(upper) rotated_bound_ranges[Parameter(d)] = (lower, upper) return Bounds( rotated_bound_ranges, self.parameterization.union(parameterization) ) def copy(self): return Bounds(self.bound_ranges.copy(), self.parameterization.copy()) def _compute_bounds(self, parameterization=None, nr_sample=10000): # TODO this currently guesses the bounds by randomly sampling parameterization. This can be improved in the future. # get new parameterization if provided if parameterization is not None: parameterization = self.parameterization # set bound ranges computed_bound_ranges = {} for key, (lower, upper) in self.bound_ranges.items(): # compute lower if isinstance(lower, (float, int)): computed_lower = lower elif isinstance(lower, sympy.Basic): fn_lower = np_lambdify(lower, parameterization.parameters) computed_lower = np.min(fn_lower(**parameterization.sample(nr_sample))) else: raise ValueError( "Bound has non numeric or sympy values: " + str(self.bound_ranges) ) # compute upper if isinstance(upper, (float, int)): computed_upper = upper elif isinstance(upper, sympy.Basic): fn_upper = np_lambdify(upper, parameterization.parameters) computed_upper = np.max(fn_upper(**parameterization.sample(nr_sample))) else: raise ValueError( "Bound has non numeric or sympy values: " + str(self.bound_ranges) ) # store new range computed_bound_ranges[key] = (computed_lower, computed_upper) return computed_bound_ranges def __str__(self): return ( "bound_ranges: " + str(self.bound_ranges) + " param_ranges: " + str(self.parameterization) )

def _sample_ranges(batch_size, ranges, quasirandom=False): parameterization = {} if quasirandom: prime_index = 0 primes = create_primes(1000) for key, value in ranges.items(): # sample parameter if isinstance(value, tuple): if quasirandom: indices = [idx for idx in range(batch_size)] rand_param = ( value[0] + (value[1] - value[0]) * create_samples(indices, number_base=primes[prime_index]).reshape( -1, 1 ) ).astype(float) prime_index += 1 else: rand_param = np.random.uniform(value[0], value[1], size=(batch_size, 1)) elif isinstance(value, (float, int)): rand_param = np.zeros((batch_size, 1)) + value elif isinstance(value, np.ndarray): np_index = np.random.choice(value.shape[0], batch_size) rand_param = value[np_index, :] elif isinstance(value, Callable): rand_param = value(batch_size) else: raise ValueError( "range type: " + str(type(value)) + " not supported, try (tuple, or np.ndarray)" ) # if dependent sample break up parameter if isinstance(key, tuple): for i, k in enumerate(key): parameterization[k] = rand_param[:, i : i + 1] else: parameterization[key] = rand_param return parameterization

© Copyright 2023, NVIDIA Modulus Team. Last updated on Aug 8, 2023.