deeplearning/modulus/modulus-v2209/_modules/modulus/geometry/geometry.html

v22.09

Source code for modulus.geometry.geometry

"""
Defines base class for all geometries
"""

import copy
import numpy as np
import itertools
import sympy
from typing import Callable, Union, List

from modulus.utils.sympy import np_lambdify
from modulus.constants import diff_str
from .parameterization import Parameterization, Bounds
from .helper import (
    _concat_numpy_dict_list,
    _sympy_sdf_to_sdf,
    _sympy_criteria_to_criteria,
    _sympy_func_to_func,
)


def csg_curve_naming(index):
    return "PRIMITIVE_PARAM_" + str(index).zfill(5)


[docs]class Geometry: """ Base class for all geometries """ def __init__( self, curves, sdf, dims, bounds, parameterization=Parameterization(), interior_epsilon=1e-6, ): # store attributes self.curves = curves self.sdf = sdf self._dims = dims self.bounds = bounds self.parameterization = parameterization self.interior_epsilon = interior_epsilon # to check if in domain or outside @property def dims(self): """ Returns ------- dims : List[srt] output can be ['x'], ['x','y'], or ['x','y','z'] """ return ["x", "y", "z"][: self._dims]
[docs] def scale( self, x: Union[float, sympy.Basic], parameterization: Parameterization = Parameterization(), ): """ Scales geometry. Parameters ---------- x : Union[float, sympy.Basic] Scale factor. Can be a sympy expression if parameterizing. parameterization : Parameterization Parameterization if scale factor is parameterized. """ # create scaled sdf function def _scale_sdf(sdf, dims, x): if isinstance(x, (float, int)): pass elif isinstance(x, sympy.Basic): x = _sympy_func_to_func(x) else: raise TypeError("Scaling by type " + str(type(x)) + "is not supported") def scale_sdf(invar, params, compute_sdf_derivatives=False): # compute scale if needed if isinstance(x, (float, int)): computed_scale = x else: computed_scale = x(params) # scale input to sdf function scaled_invar = {**invar} for key in dims: scaled_invar[key] = scaled_invar[key] / computed_scale # compute sdf computed_sdf = sdf(scaled_invar, params, compute_sdf_derivatives) # scale output sdf values if isinstance(x, (float, int)): computed_sdf["sdf"] *= x else: computed_sdf["sdf"] *= x(params) return computed_sdf return scale_sdf new_sdf = _scale_sdf(self.sdf, self.dims, x) # add parameterization new_parameterization = self.parameterization.union(parameterization) # scale bounds new_bounds = self.bounds.scale(x, parameterization) # scale curves new_curves = [c.scale(x, parameterization) for c in self.curves] # return scaled geometry return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, )
[docs] def translate( self, xyz: List[Union[float, sympy.Basic]], parameterization: Parameterization = Parameterization(), ): """ Translates geometry. Parameters ---------- xyz : List[Union[float, sympy.Basic]] Translation. Can be a sympy expression if parameterizing. parameterization : Parameterization Parameterization if translation is parameterized. """ # create translated sdf function def _translate_sdf(sdf, dims, xyx): compiled_xyz = [] for i, x in enumerate(xyz): if isinstance(x, (float, int)): compiled_xyz.append(x) elif isinstance(x, sympy.Basic): compiled_xyz.append(_sympy_func_to_func(x)) else: raise TypeError( "Translate by type " + str(type(x)) + "is not supported" ) def translate_sdf(invar, params, compute_sdf_derivatives=False): # compute translation if needed computed_translation = [] for x in compiled_xyz: if isinstance(x, (float, int)): computed_translation.append(x) else: computed_translation.append(x(params)) # translate input to sdf function translated_invar = {**invar} for i, key in enumerate(dims): translated_invar[key] = ( translated_invar[key] - computed_translation[i] ) # compute sdf computed_sdf = sdf(translated_invar, params, compute_sdf_derivatives) return computed_sdf return translate_sdf new_sdf = _translate_sdf(self.sdf, self.dims, xyz) # add parameterization new_parameterization = self.parameterization.union(parameterization) # translate bounds new_bounds = self.bounds.translate(xyz, parameterization) # translate curves new_curves = [c.translate(xyz, parameterization) for c in self.curves] # return translated geometry return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, )
[docs] def rotate( self, angle: Union[float, sympy.Basic], axis: str = "z", center: Union[None, List[float]] = None, parameterization=Parameterization(), ): """ Rotates geometry. Parameters ---------- angle : Union[float, sympy.Basic] Angle of rotate in radians. Can be a sympy expression if parameterizing. axis : str Axis of rotation. Default is `"z"`. center : Union[None, List[Union[float, sympy.Basic]]] = None If given then center the rotation around this point. parameterization : Parameterization Parameterization if translation is parameterized. """ # create rotated sdf function def _rotate_sdf(sdf, dims, angle, axis, center): if isinstance(angle, (float, int)): pass elif isinstance(angle, sympy.Basic): angle = _sympy_func_to_func(angle) else: raise TypeError( "Scaling by type " + str(type(angle)) + "is not supported" ) def rotate_sdf(invar, params, compute_sdf_derivatives=False): # compute translation if needed if isinstance(angle, (float, int)): computed_angle = angle else: computed_angle = angle(params) # rotate input to sdf function rotated_invar = {**invar} if center is not None: for i, key in enumerate(dims): rotated_invar[key] = rotated_invar[key] - center[i] _rotated_invar = {**rotated_invar} rotated_dims = [key for key in dims if key != axis] _rotated_invar[rotated_dims[0]] = ( np.cos(computed_angle) * rotated_invar[rotated_dims[0]] + np.sin(computed_angle) * rotated_invar[rotated_dims[1]] ) _rotated_invar[rotated_dims[1]] = ( -np.sin(computed_angle) * rotated_invar[rotated_dims[0]] + np.cos(computed_angle) * rotated_invar[rotated_dims[1]] ) if center is not None: for i, key in enumerate(dims): _rotated_invar[key] = _rotated_invar[key] + center[i] # compute sdf computed_sdf = sdf(_rotated_invar, params, compute_sdf_derivatives) return computed_sdf return rotate_sdf new_sdf = _rotate_sdf(self.sdf, self.dims, angle, axis, center) # add parameterization new_parameterization = self.parameterization.union(parameterization) # rotate bounds if center is not None: new_bounds = self.bounds.translate([-x for x in center]) new_bounds = new_bounds.rotate(angle, axis, parameterization) new_bounds = new_bounds.translate(center) else: new_bounds = self.bounds.rotate(angle, axis, parameterization) # rotate curves new_curves = [] for c in self.curves: if center is not None: new_c = c.translate([-x for x in center]) new_c = new_c.rotate(angle, axis, parameterization) new_c = new_c.translate(center) else: new_c = c.rotate(angle, axis, parameterization) new_curves.append(new_c) # return rotated geometry return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, )
[docs] def repeat( self, spacing: float, repeat_lower: List[int], repeat_higher: List[int], center: Union[None, List[float]] = None, ): """ Finite Repetition of geometry. Parameters ---------- spacing : float Spacing between each repetition. repeat_lower : List[int] How many repetitions going in negative direction. repeat_upper : List[int] How many repetitions going in positive direction. center : Union[None, List[Union[float, sympy.Basic]]] = None If given then center the rotation around this point. """ # create repeated sdf function def _repeat_sdf( sdf, dims, spacing, repeat_lower, repeat_higher, center ): # TODO make spacing, repeat_lower, and repeat_higher parameterizable def repeat_sdf(invar, params, compute_sdf_derivatives=False): # clamp position values clamped_invar = {**invar} if center is not None: for i, key in enumerate(dims): clamped_invar[key] = clamped_invar[key] - center[i] for d, rl, rh in zip(dims, repeat_lower, repeat_higher): clamped_invar[d] = clamped_invar[d] - spacing * np.minimum( np.maximum(np.around(clamped_invar[d] / spacing), rl), rh ) if center is not None: for i, key in enumerate(dims): clamped_invar[key] = clamped_invar[key] + center[i] # compute sdf computed_sdf = sdf(clamped_invar, params, compute_sdf_derivatives) return computed_sdf return repeat_sdf new_sdf = _repeat_sdf( self.sdf, self.dims, spacing, repeat_lower, repeat_higher, center ) # repeat bounds and curves new_bounds = self.bounds.copy() new_curves = [] for t in itertools.product( *[list(range(rl, rh + 1)) for rl, rh in zip(repeat_lower, repeat_higher)] ): new_bounds = new_bounds.union( self.bounds.translate([spacing * a for a in t]) ) new_curves += [c.translate([spacing * a for a in t]) for c in self.curves] # return repeated geometry return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, self.parameterization.copy(), interior_epsilon=self.interior_epsilon, )

def copy(self): return copy.deepcopy(self) def boundary_criteria(self, invar, criteria=None, params={}): # check if moving in or out of normal direction changes SDF invar_normal_plus = {**invar} invar_normal_minus = {**invar} for key in self.dims: invar_normal_plus[key] = ( invar_normal_plus[key] + self.interior_epsilon * invar_normal_plus["normal_" + key] ) invar_normal_minus[key] = ( invar_normal_minus[key] - self.interior_epsilon * invar_normal_minus["normal_" + key] ) sdf_normal_plus = self.sdf( invar_normal_plus, params, compute_sdf_derivatives=False )["sdf"] sdf_normal_minus = self.sdf( invar_normal_minus, params, compute_sdf_derivatives=False )["sdf"] on_boundary = np.greater_equal(0, sdf_normal_plus * sdf_normal_minus) # check if points satisfy the criteria function if criteria is not None: # convert sympy criteria if needed satify_criteria = criteria(invar, params) # update on_boundary on_boundary = np.logical_and(on_boundary, satify_criteria) return on_boundary

[docs] def sample_boundary( self, nr_points: int, criteria: Union[sympy.Basic, None] = None, parameterization: Union[Parameterization, None] = None, quasirandom: bool = False, ): """ Samples the surface or perimeter of the geometry. Parameters ---------- nr_points : int number of points to sample on boundary. criteria : Union[sympy.Basic, None] Only sample points that satisfy this criteria. parameterization : Union[Parameterization, None], optional If the geometry is parameterized then you can provide ranges for the parameters with this. By default the sampling will be done with the internal parameterization. quasirandom : bool If true then sample the points using the Halton sequences. Default is False. Returns ------- points : Dict[str, np.ndarray] Dictionary contain a point cloud sampled uniformly. For example in 2D it would be ``` points = {'x': np.ndarray (N, 1), 'y': np.ndarray (N, 1), 'normal_x': np.ndarray (N, 1), 'normal_y': np.ndarray (N, 1), 'area': np.ndarray (N, 1)} ``` The `area` value can be used for Monte Carlo integration like the following, `total_area = np.sum(points['area'])` """ # compile criteria from sympy if needed if criteria is not None: if isinstance(criteria, sympy.Basic): criteria = _sympy_criteria_to_criteria(criteria) elif isinstance(criteria, Callable): pass else: raise TypeError( "criteria type is not supported: " + str(type(criteria)) ) # use internal parameterization if not given if parameterization is None: parameterization = self.parameterization elif isinstance(parameterization, dict): parameterization = Parameterization(parameterization) # create boundary criteria closure def _boundary_criteria(criteria): def boundary_criteria(invar, params): return self.boundary_criteria(invar, criteria=criteria, params=params) return boundary_criteria closed_boundary_criteria = _boundary_criteria(criteria) # compute required points on each curve curve_areas = np.array( [ curve.approx_area(parameterization, criteria=closed_boundary_criteria) for curve in self.curves ] ) assert np.sum(curve_areas) > 0, "Geometry has no surface" curve_probabilities = curve_areas / np.linalg.norm(curve_areas, ord=1) curve_index = np.arange(len(self.curves)) points_per_curve = np.random.choice( curve_index, nr_points, p=curve_probabilities ) points_per_curve, _ = np.histogram( points_per_curve, np.arange(len(self.curves) + 1) - 0.5 ) # continually sample each curve until reached desired number of points list_invar = [] list_params = [] for n, a, curve in zip(points_per_curve, curve_areas, self.curves): if n > 0: i, p = curve.sample( n, criteria=closed_boundary_criteria, parameterization=parameterization, ) i["area"] = np.full_like(i["area"], a / n) list_invar.append(i) list_params.append(p) invar = _concat_numpy_dict_list(list_invar) params = _concat_numpy_dict_list(list_params) invar.update(params) return invar
[docs] def sample_interior( self, nr_points: int, bounds: Union[Bounds, None] = None, criteria: Union[sympy.Basic, None] = None, parameterization: Union[Parameterization, None] = None, compute_sdf_derivatives: bool = False, quasirandom: bool = False, ): """ Samples the interior of the geometry. Parameters ---------- nr_points : int number of points to sample. bounds : Union[Bounds, None] Bounds to sample points from. For example, `bounds = Bounds({Parameter('x'): (0, 1), Parameter('y'): (0, 1)})`. By default the internal bounds will be used. criteria : Union[sympy.Basic, None] Only sample points that satisfy this criteria. parameterization: Union[Parameterization, None] If the geometry is parameterized then you can provide ranges for the parameters with this. compute_sdf_derivatives : bool Compute sdf derivatives if true. quasirandom : bool If true then sample the points using the Halton sequences. Default is False. Returns ------- points : Dict[str, np.ndarray] Dictionary contain a point cloud sampled uniformly. For example in 2D it would be ``` points = {'x': np.ndarray (N, 1), 'y': np.ndarray (N, 1), 'sdf': np.ndarray (N, 1), 'area': np.ndarray (N, 1)} ``` The `area` value can be used for Monte Carlo integration like the following, `total_area = np.sum(points['area'])` """ # compile criteria from sympy if needed if criteria is not None: if isinstance(criteria, sympy.Basic): criteria = _sympy_criteria_to_criteria(criteria) elif isinstance(criteria, Callable): pass else: raise TypeError( "criteria type is not supported: " + str(type(criteria)) ) # use internal bounds if not given if bounds is None: bounds = self.bounds elif isinstance(bounds, dict): bounds = Bounds(bounds) # use internal parameterization if not given if parameterization is None: parameterization = self.parameterization elif isinstance(parameterization, dict): parameterization = Parameterization(parameterization) # continually sample until reached desired number of points invar = {} params = {} total_tried = 0 nr_try = 0 while True: # sample invar and params local_invar = bounds.sample(nr_points, parameterization, quasirandom) local_params = parameterization.sample(nr_points, quasirandom) # evaluate SDF function on points local_invar.update( self.sdf( local_invar, local_params, compute_sdf_derivatives=compute_sdf_derivatives, ) ) # remove points outside of domain criteria_index = np.greater(local_invar["sdf"], 0) if criteria is not None: criteria_index = np.logical_and( criteria_index, criteria(local_invar, local_params) ) for key in local_invar.keys(): local_invar[key] = local_invar[key][criteria_index[:, 0], :] for key in local_params.keys(): local_params[key] = local_params[key][criteria_index[:, 0], :] # add sampled points to list for key in local_invar.keys(): if key not in invar.keys(): # TODO this can be condensed invar[key] = local_invar[key] else: invar[key] = np.concatenate([invar[key], local_invar[key]], axis=0) for key in local_params.keys(): if key not in params.keys(): # TODO this can be condensed params[key] = local_params[key] else: params[key] = np.concatenate( [params[key], local_params[key]], axis=0 ) # check if finished total_sampled = next(iter(invar.values())).shape[0] total_tried += nr_points nr_try += 1 if total_sampled >= nr_points: for key, value in invar.items(): invar[key] = value[:nr_points] for key, value in params.items(): params[key] = value[:nr_points] break # report error if could not sample if nr_try > 100 and total_sampled < 1: raise RuntimeError( "Could not sample interior of geometry. Check to make sure non-zero volume" ) # compute area value for monte carlo integration volume = (total_sampled / total_tried) * bounds.volume(parameterization) invar["area"] = np.full_like(next(iter(invar.values())), volume / nr_points) # add params to invar invar.update(params) return invar

@staticmethod def _convert_criteria(criteria): return criteria def __add__(self, other): def _add_sdf(sdf_1, sdf_2, dims): def add_sdf(invar, params, compute_sdf_derivatives=False): computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives) computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives) computed_sdf = {} computed_sdf["sdf"] = np.maximum( computed_sdf_1["sdf"], computed_sdf_2["sdf"] ) if compute_sdf_derivatives: for d in dims: computed_sdf["sdf" + diff_str + d] = np.where( computed_sdf_1["sdf"] > computed_sdf_2["sdf"], computed_sdf_1["sdf" + diff_str + d], computed_sdf_2["sdf" + diff_str + d], ) return computed_sdf return add_sdf new_sdf = _add_sdf(self.sdf, other.sdf, self.dims) new_parameterization = self.parameterization.union(other.parameterization) new_bounds = self.bounds.union(other.bounds) return Geometry( self.curves + other.curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, ) def __sub__(self, other): def _sub_sdf(sdf_1, sdf_2, dims): def sub_sdf(invar, params, compute_sdf_derivatives=False): computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives) computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives) computed_sdf = {} computed_sdf["sdf"] = np.minimum( computed_sdf_1["sdf"], -computed_sdf_2["sdf"] ) if compute_sdf_derivatives: for d in dims: computed_sdf["sdf" + diff_str + d] = np.where( computed_sdf_1["sdf"] < -computed_sdf_2["sdf"], computed_sdf_1["sdf" + diff_str + d], -computed_sdf_2["sdf" + diff_str + d], ) return computed_sdf return sub_sdf new_sdf = _sub_sdf(self.sdf, other.sdf, self.dims) new_parameterization = self.parameterization.union(other.parameterization) new_bounds = self.bounds.union(other.bounds) new_curves = self.curves + [c.invert_normal() for c in other.curves] return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, ) def __invert__(self): def _invert_sdf(sdf, dims): def invert_sdf(invar, params, compute_sdf_derivatives=False): computed_sdf = sdf(invar, params, compute_sdf_derivatives) computed_sdf["sdf"] = -computed_sdf["sdf"] if compute_sdf_derivatives: for d in dims: computed_sdf["sdf" + diff_str + d] = -computed_sdf[ "sdf" + diff_str + d ] return computed_sdf return invert_sdf new_sdf = _invert_sdf(self.sdf, self.dims) new_parameterization = self.parameterization.copy() new_bounds = self.bounds.copy() new_curves = [c.invert_normal() for c in self.curves] return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, ) def __and__(self, other): def _and_sdf(sdf_1, sdf_2, dims): def and_sdf(invar, params, compute_sdf_derivatives=False): computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives) computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives) computed_sdf = {} computed_sdf["sdf"] = np.minimum( computed_sdf_1["sdf"], computed_sdf_2["sdf"] ) if compute_sdf_derivatives: for d in dims: computed_sdf["sdf" + diff_str + d] = np.where( computed_sdf_1["sdf"] < computed_sdf_2["sdf"], computed_sdf_1["sdf" + diff_str + d], computed_sdf_2["sdf" + diff_str + d], ) return computed_sdf return and_sdf new_sdf = _and_sdf(self.sdf, other.sdf, self.dims) new_parameterization = self.parameterization.union(other.parameterization) new_bounds = self.bounds.union(other.bounds) new_curves = self.curves + other.curves return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, )

© Copyright 2021-2022, NVIDIA. Last updated on Apr 26, 2023.