# Source code for modulus.sym.geometry.geometry

# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

"""
Defines base class for all geometries
"""

import copy
import numpy as np
import itertools
import sympy
from typing import Callable, Union, List

from modulus.sym.utils.sympy import np_lambdify
from modulus.sym.constants import diff_str
from .parameterization import Parameterization, Bounds
from .helper import (
_concat_numpy_dict_list,
_sympy_sdf_to_sdf,
_sympy_criteria_to_criteria,
_sympy_func_to_func,
)

def csg_curve_naming(index):
return "PRIMITIVE_PARAM_" + str(index).zfill(5)

[docs]class Geometry:
"""
Base class for all geometries
"""

def __init__(
self,
curves,
sdf,
dims,
bounds,
parameterization=Parameterization(),
interior_epsilon=1e-6,
):
# store attributes
self.curves = curves
self.sdf = sdf
self._dims = dims
self.bounds = bounds
self.parameterization = parameterization
self.interior_epsilon = interior_epsilon  # to check if in domain or outside

@property
def dims(self):
"""
Returns
-------
dims : List[srt]
output can be ['x'], ['x','y'], or ['x','y','z']
"""

return ["x", "y", "z"][: self._dims]

[docs]    def scale(
self,
x: Union[float, sympy.Basic],
parameterization: Parameterization = Parameterization(),
):
"""
Scales geometry.

Parameters
----------
x : Union[float, sympy.Basic]
Scale factor. Can be a sympy expression if parameterizing.
parameterization : Parameterization
Parameterization if scale factor is parameterized.
"""

# create scaled sdf function
def _scale_sdf(sdf, dims, x):
if isinstance(x, (float, int)):
pass
elif isinstance(x, sympy.Basic):
x = _sympy_func_to_func(x)
else:
raise TypeError("Scaling by type " + str(type(x)) + "is not supported")

def scale_sdf(invar, params, compute_sdf_derivatives=False):
# compute scale if needed
if isinstance(x, (float, int)):
computed_scale = x
else:
computed_scale = x(params)

# scale input to sdf function
scaled_invar = {**invar}
for key in dims:
scaled_invar[key] = scaled_invar[key] / computed_scale

# compute sdf
computed_sdf = sdf(scaled_invar, params, compute_sdf_derivatives)

# scale output sdf values
if isinstance(x, (float, int)):
computed_sdf["sdf"] *= x
else:
computed_sdf["sdf"] *= x(params)
return computed_sdf

return scale_sdf

new_sdf = _scale_sdf(self.sdf, self.dims, x)

new_parameterization = self.parameterization.union(parameterization)

# scale bounds
new_bounds = self.bounds.scale(x, parameterization)

# scale curves
new_curves = [c.scale(x, parameterization) for c in self.curves]

# return scaled geometry
return Geometry(
new_curves,
new_sdf,
len(self.dims),
new_bounds,
new_parameterization,
interior_epsilon=self.interior_epsilon,
)

[docs]    def translate(
self,
xyz: List[Union[float, sympy.Basic]],
parameterization: Parameterization = Parameterization(),
):
"""
Translates geometry.

Parameters
----------
xyz : List[Union[float, sympy.Basic]]
Translation. Can be a sympy expression if parameterizing.
parameterization : Parameterization
Parameterization if translation is parameterized.
"""

# create translated sdf function
def _translate_sdf(sdf, dims, xyx):
compiled_xyz = []
for i, x in enumerate(xyz):
if isinstance(x, (float, int)):
compiled_xyz.append(x)
elif isinstance(x, sympy.Basic):
compiled_xyz.append(_sympy_func_to_func(x))
else:
raise TypeError(
"Translate by type " + str(type(x)) + "is not supported"
)

def translate_sdf(invar, params, compute_sdf_derivatives=False):
# compute translation if needed
computed_translation = []
for x in compiled_xyz:
if isinstance(x, (float, int)):
computed_translation.append(x)
else:
computed_translation.append(x(params))

# translate input to sdf function
translated_invar = {**invar}
for i, key in enumerate(dims):
translated_invar[key] = (
translated_invar[key] - computed_translation[i]
)

# compute sdf
computed_sdf = sdf(translated_invar, params, compute_sdf_derivatives)
return computed_sdf

return translate_sdf

new_sdf = _translate_sdf(self.sdf, self.dims, xyz)

new_parameterization = self.parameterization.union(parameterization)

# translate bounds
new_bounds = self.bounds.translate(xyz, parameterization)

# translate curves
new_curves = [c.translate(xyz, parameterization) for c in self.curves]

# return translated geometry
return Geometry(
new_curves,
new_sdf,
len(self.dims),
new_bounds,
new_parameterization,
interior_epsilon=self.interior_epsilon,
)

[docs]    def rotate(
self,
angle: Union[float, sympy.Basic],
axis: str = "z",
center: Union[None, List[float]] = None,
parameterization=Parameterization(),
):
"""
Rotates geometry.

Parameters
----------
angle : Union[float, sympy.Basic]
Angle of rotate in radians. Can be a sympy expression if parameterizing.
axis : str
Axis of rotation. Default is "z".
center : Union[None, List[Union[float, sympy.Basic]]] = None
If given then center the rotation around this point.
parameterization : Parameterization
Parameterization if translation is parameterized.
"""

# create rotated sdf function
def _rotate_sdf(sdf, dims, angle, axis, center):
if isinstance(angle, (float, int)):
pass
elif isinstance(angle, sympy.Basic):
angle = _sympy_func_to_func(angle)
else:
raise TypeError(
"Scaling by type " + str(type(angle)) + "is not supported"
)

def rotate_sdf(invar, params, compute_sdf_derivatives=False):
# compute translation if needed
if isinstance(angle, (float, int)):
computed_angle = angle
else:
computed_angle = angle(params)

# rotate input to sdf function
rotated_invar = {**invar}
if center is not None:
for i, key in enumerate(dims):
rotated_invar[key] = rotated_invar[key] - center[i]
_rotated_invar = {**rotated_invar}
rotated_dims = [key for key in dims if key != axis]
_rotated_invar[rotated_dims[0]] = (
np.cos(computed_angle) * rotated_invar[rotated_dims[0]]
+ np.sin(computed_angle) * rotated_invar[rotated_dims[1]]
)
_rotated_invar[rotated_dims[1]] = (
-np.sin(computed_angle) * rotated_invar[rotated_dims[0]]
+ np.cos(computed_angle) * rotated_invar[rotated_dims[1]]
)
if center is not None:
for i, key in enumerate(dims):
_rotated_invar[key] = _rotated_invar[key] + center[i]

# compute sdf
computed_sdf = sdf(_rotated_invar, params, compute_sdf_derivatives)
return computed_sdf

return rotate_sdf

new_sdf = _rotate_sdf(self.sdf, self.dims, angle, axis, center)

new_parameterization = self.parameterization.union(parameterization)

# rotate bounds
if center is not None:
new_bounds = self.bounds.translate([-x for x in center])
new_bounds = new_bounds.rotate(angle, axis, parameterization)
new_bounds = new_bounds.translate(center)
else:
new_bounds = self.bounds.rotate(angle, axis, parameterization)

# rotate curves
new_curves = []
for c in self.curves:
if center is not None:
new_c = c.translate([-x for x in center])
new_c = new_c.rotate(angle, axis, parameterization)
new_c = new_c.translate(center)
else:
new_c = c.rotate(angle, axis, parameterization)
new_curves.append(new_c)

# return rotated geometry
return Geometry(
new_curves,
new_sdf,
len(self.dims),
new_bounds,
new_parameterization,
interior_epsilon=self.interior_epsilon,
)

[docs]    def repeat(
self,
spacing: float,
repeat_lower: List[int],
repeat_higher: List[int],
center: Union[None, List[float]] = None,
):
"""
Finite Repetition of geometry.

Parameters
----------
spacing : float
Spacing between each repetition.
repeat_lower : List[int]
How many repetitions going in negative direction.
repeat_upper : List[int]
How many repetitions going in positive direction.
center : Union[None, List[Union[float, sympy.Basic]]] = None
If given then center the rotation around this point.
"""

# create repeated sdf function
def _repeat_sdf(
sdf, dims, spacing, repeat_lower, repeat_higher, center
):  # TODO make spacing, repeat_lower, and repeat_higher parameterizable
def repeat_sdf(invar, params, compute_sdf_derivatives=False):
# clamp position values
clamped_invar = {**invar}
if center is not None:
for i, key in enumerate(dims):
clamped_invar[key] = clamped_invar[key] - center[i]
for d, rl, rh in zip(dims, repeat_lower, repeat_higher):
clamped_invar[d] = clamped_invar[d] - spacing * np.minimum(
np.maximum(np.around(clamped_invar[d] / spacing), rl), rh
)
if center is not None:
for i, key in enumerate(dims):
clamped_invar[key] = clamped_invar[key] + center[i]

# compute sdf
computed_sdf = sdf(clamped_invar, params, compute_sdf_derivatives)
return computed_sdf

return repeat_sdf

new_sdf = _repeat_sdf(
self.sdf, self.dims, spacing, repeat_lower, repeat_higher, center
)

# repeat bounds and curves
new_bounds = self.bounds.copy()
new_curves = []
for t in itertools.product(
*[list(range(rl, rh + 1)) for rl, rh in zip(repeat_lower, repeat_higher)]
):
new_bounds = new_bounds.union(
self.bounds.translate([spacing * a for a in t])
)
new_curves += [c.translate([spacing * a for a in t]) for c in self.curves]

# return repeated geometry
return Geometry(
new_curves,
new_sdf,
len(self.dims),
new_bounds,
self.parameterization.copy(),
interior_epsilon=self.interior_epsilon,
)def copy(self):
return copy.deepcopy(self)

def boundary_criteria(self, invar, criteria=None, params={}):
# check if moving in or out of normal direction changes SDF
invar_normal_plus = {**invar}
invar_normal_minus = {**invar}
for key in self.dims:
invar_normal_plus[key] = (
invar_normal_plus[key]
+ self.interior_epsilon * invar_normal_plus["normal_" + key]
)
invar_normal_minus[key] = (
invar_normal_minus[key]
- self.interior_epsilon * invar_normal_minus["normal_" + key]
)
sdf_normal_plus = self.sdf(
invar_normal_plus, params, compute_sdf_derivatives=False
)["sdf"]
sdf_normal_minus = self.sdf(
invar_normal_minus, params, compute_sdf_derivatives=False
)["sdf"]
on_boundary = np.greater_equal(0, sdf_normal_plus * sdf_normal_minus)

# check if points satisfy the criteria function
if criteria is not None:
# convert sympy criteria if needed
satify_criteria = criteria(invar, params)

# update on_boundary
on_boundary = np.logical_and(on_boundary, satify_criteria)

return on_boundary

[docs]    def sample_boundary(
self,
nr_points: int,
criteria: Union[sympy.Basic, None] = None,
parameterization: Union[Parameterization, None] = None,
quasirandom: bool = False,
):
"""
Samples the surface or perimeter of the geometry.

Parameters
----------
nr_points : int
number of points to sample on boundary.
criteria : Union[sympy.Basic, None]
Only sample points that satisfy this criteria.
parameterization : Union[Parameterization, None], optional
If the geometry is parameterized then you can provide ranges
for the parameters with this. By default the sampling will be
done with the internal parameterization.
quasirandom : bool
If true then sample the points using the Halton sequences.
Default is False.

Returns
-------
points : Dict[str, np.ndarray]
Dictionary contain a point cloud sampled uniformly.
For example in 2D it would be

points = {'x': np.ndarray (N, 1),
'y': np.ndarray (N, 1),
'normal_x': np.ndarray (N, 1),
'normal_y': np.ndarray (N, 1),
'area': np.ndarray (N, 1)}

The area value can be used for Monte Carlo integration
like the following,
total_area = np.sum(points['area'])
"""

# compile criteria from sympy if needed
if criteria is not None:
if isinstance(criteria, sympy.Basic):
criteria = _sympy_criteria_to_criteria(criteria)
elif isinstance(criteria, Callable):
pass
else:
raise TypeError(
"criteria type is not supported: " + str(type(criteria))
)

# use internal parameterization if not given
if parameterization is None:
parameterization = self.parameterization
elif isinstance(parameterization, dict):
parameterization = Parameterization(parameterization)

# create boundary criteria closure
def _boundary_criteria(criteria):
def boundary_criteria(invar, params):
return self.boundary_criteria(invar, criteria=criteria, params=params)

return boundary_criteria

closed_boundary_criteria = _boundary_criteria(criteria)

# compute required points on each curve
curve_areas = np.array(
[
curve.approx_area(parameterization, criteria=closed_boundary_criteria)
for curve in self.curves
]
)
assert np.sum(curve_areas) > 0, "Geometry has no surface"
curve_probabilities = curve_areas / np.linalg.norm(curve_areas, ord=1)
curve_index = np.arange(len(self.curves))
points_per_curve = np.random.choice(
curve_index, nr_points, p=curve_probabilities
)
points_per_curve, _ = np.histogram(
points_per_curve, np.arange(len(self.curves) + 1) - 0.5
)

# continually sample each curve until reached desired number of points
list_invar = []
list_params = []
for n, a, curve in zip(points_per_curve, curve_areas, self.curves):
if n > 0:
i, p = curve.sample(
n,
criteria=closed_boundary_criteria,
parameterization=parameterization,
)
i["area"] = np.full_like(i["area"], a / n)
list_invar.append(i)
list_params.append(p)
invar = _concat_numpy_dict_list(list_invar)
params = _concat_numpy_dict_list(list_params)
invar.update(params)
return invar

[docs]    def sample_interior(
self,
nr_points: int,
bounds: Union[Bounds, None] = None,
criteria: Union[sympy.Basic, None] = None,
parameterization: Union[Parameterization, None] = None,
compute_sdf_derivatives: bool = False,
quasirandom: bool = False,
):
"""
Samples the interior of the geometry.

Parameters
----------
nr_points : int
number of points to sample.
bounds : Union[Bounds, None]
Bounds to sample points from. For example,
bounds = Bounds({Parameter('x'): (0, 1), Parameter('y'): (0, 1)}).
By default the internal bounds will be used.
criteria : Union[sympy.Basic, None]
Only sample points that satisfy this criteria.
parameterization: Union[Parameterization, None]
If the geometry is parameterized then you can provide ranges
for the parameters with this.
compute_sdf_derivatives : bool
Compute sdf derivatives if true.
quasirandom : bool
If true then sample the points using the Halton sequences.
Default is False.

Returns
-------
points : Dict[str, np.ndarray]
Dictionary contain a point cloud sampled uniformly.
For example in 2D it would be

points = {'x': np.ndarray (N, 1),
'y': np.ndarray (N, 1),
'sdf': np.ndarray (N, 1),
'area': np.ndarray (N, 1)}

The area value can be used for Monte Carlo integration
like the following,
total_area = np.sum(points['area'])
"""

# compile criteria from sympy if needed
if criteria is not None:
if isinstance(criteria, sympy.Basic):
criteria = _sympy_criteria_to_criteria(criteria)
elif isinstance(criteria, Callable):
pass
else:
raise TypeError(
"criteria type is not supported: " + str(type(criteria))
)

# use internal bounds if not given
if bounds is None:
bounds = self.bounds
elif isinstance(bounds, dict):
bounds = Bounds(bounds)

# use internal parameterization if not given
if parameterization is None:
parameterization = self.parameterization
elif isinstance(parameterization, dict):
parameterization = Parameterization(parameterization)

# continually sample until reached desired number of points
invar = {}
params = {}
total_tried = 0
nr_try = 0
while True:
# sample invar and params
local_invar = bounds.sample(nr_points, parameterization, quasirandom)
local_params = parameterization.sample(nr_points, quasirandom)

# evaluate SDF function on points
local_invar.update(
self.sdf(
local_invar,
local_params,
compute_sdf_derivatives=compute_sdf_derivatives,
)
)

# remove points outside of domain
criteria_index = np.greater(local_invar["sdf"], 0)
if criteria is not None:
criteria_index = np.logical_and(
criteria_index, criteria(local_invar, local_params)
)
for key in local_invar.keys():
local_invar[key] = local_invar[key][criteria_index[:, 0], :]
for key in local_params.keys():
local_params[key] = local_params[key][criteria_index[:, 0], :]

# add sampled points to list
for key in local_invar.keys():
if key not in invar.keys():  # TODO this can be condensed
invar[key] = local_invar[key]
else:
invar[key] = np.concatenate([invar[key], local_invar[key]], axis=0)
for key in local_params.keys():
if key not in params.keys():  # TODO this can be condensed
params[key] = local_params[key]
else:
params[key] = np.concatenate(
[params[key], local_params[key]], axis=0
)

# check if finished
total_sampled = next(iter(invar.values())).shape[0]
total_tried += nr_points
nr_try += 1
if total_sampled >= nr_points:
for key, value in invar.items():
invar[key] = value[:nr_points]
for key, value in params.items():
params[key] = value[:nr_points]
break

if nr_try > 100 and total_sampled < 1:
raise RuntimeError(
"Could not sample interior of geometry. Check to make sure non-zero volume"
)

# compute area value for monte carlo integration
volume = (total_sampled / total_tried) * bounds.volume(parameterization)
invar["area"] = np.full_like(next(iter(invar.values())), volume / nr_points)

invar.update(params)
return invar@staticmethod
def _convert_criteria(criteria):
return criteria

computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives)
computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives)
computed_sdf = {}
computed_sdf["sdf"] = np.maximum(
computed_sdf_1["sdf"], computed_sdf_2["sdf"]
)
if compute_sdf_derivatives:
for d in dims:
computed_sdf["sdf" + diff_str + d] = np.where(
computed_sdf_1["sdf"] > computed_sdf_2["sdf"],
computed_sdf_1["sdf" + diff_str + d],
computed_sdf_2["sdf" + diff_str + d],
)
return computed_sdf

new_parameterization = self.parameterization.union(other.parameterization)
new_bounds = self.bounds.union(other.bounds)

return Geometry(
self.curves + other.curves,
new_sdf,
len(self.dims),
new_bounds,
new_parameterization,
interior_epsilon=self.interior_epsilon,
)

def __sub__(self, other):
def _sub_sdf(sdf_1, sdf_2, dims):
def sub_sdf(invar, params, compute_sdf_derivatives=False):
computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives)
computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives)
computed_sdf = {}
computed_sdf["sdf"] = np.minimum(
computed_sdf_1["sdf"], -computed_sdf_2["sdf"]
)
if compute_sdf_derivatives:
for d in dims:
computed_sdf["sdf" + diff_str + d] = np.where(
computed_sdf_1["sdf"] < -computed_sdf_2["sdf"],
computed_sdf_1["sdf" + diff_str + d],
-computed_sdf_2["sdf" + diff_str + d],
)
return computed_sdf

return sub_sdf

new_sdf = _sub_sdf(self.sdf, other.sdf, self.dims)
new_parameterization = self.parameterization.union(other.parameterization)
new_bounds = self.bounds.union(other.bounds)
new_curves = self.curves + [c.invert_normal() for c in other.curves]

return Geometry(
new_curves,
new_sdf,
len(self.dims),
new_bounds,
new_parameterization,
interior_epsilon=self.interior_epsilon,
)

def __invert__(self):
def _invert_sdf(sdf, dims):
def invert_sdf(invar, params, compute_sdf_derivatives=False):
computed_sdf = sdf(invar, params, compute_sdf_derivatives)
computed_sdf["sdf"] = -computed_sdf["sdf"]
if compute_sdf_derivatives:
for d in dims:
computed_sdf["sdf" + diff_str + d] = -computed_sdf[
"sdf" + diff_str + d
]
return computed_sdf

return invert_sdf

new_sdf = _invert_sdf(self.sdf, self.dims)
new_parameterization = self.parameterization.copy()
new_bounds = self.bounds.copy()
new_curves = [c.invert_normal() for c in self.curves]

return Geometry(
new_curves,
new_sdf,
len(self.dims),
new_bounds,
new_parameterization,
interior_epsilon=self.interior_epsilon,
)

def __and__(self, other):
def _and_sdf(sdf_1, sdf_2, dims):
def and_sdf(invar, params, compute_sdf_derivatives=False):
computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives)
computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives)
computed_sdf = {}
computed_sdf["sdf"] = np.minimum(
computed_sdf_1["sdf"], computed_sdf_2["sdf"]
)
if compute_sdf_derivatives:
for d in dims:
computed_sdf["sdf" + diff_str + d] = np.where(
computed_sdf_1["sdf"] < computed_sdf_2["sdf"],
computed_sdf_1["sdf" + diff_str + d],
computed_sdf_2["sdf" + diff_str + d],
)
return computed_sdf

return and_sdf

new_sdf = _and_sdf(self.sdf, other.sdf, self.dims)
new_parameterization = self.parameterization.union(other.parameterization)
new_bounds = self.bounds.union(other.bounds)
new_curves = self.curves + other.curves

return Geometry(
new_curves,
new_sdf,
len(self.dims),
new_bounds,
new_parameterization,
interior_epsilon=self.interior_epsilon,
)


Â© Copyright 2023, NVIDIA Modulus Team. Last updated on Aug 8, 2023.