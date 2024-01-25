# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Defines base class for all geometries """ import copy import numpy as np import itertools import sympy from typing import Callable , Union , List from modulus.sym.utils.sympy import np_lambdify from modulus.sym.constants import diff_str from .parameterization import Parameterization , Bounds from .helper import ( _concat_numpy_dict_list , _sympy_sdf_to_sdf , _sympy_criteria_to_criteria , _sympy_func_to_func , ) def csg_curve_naming ( index ): return "PRIMITIVE_PARAM_" + str ( index ) . zfill ( 5 ) [docs] class Geometry : """ Base class for all geometries """ def __init__ ( self , curves , sdf , dims , bounds , parameterization = Parameterization (), interior_epsilon = 1e-6 , ): # store attributes self . curves = curves self . sdf = sdf self . _dims = dims self . bounds = bounds self . parameterization = parameterization self . interior_epsilon = interior_epsilon # to check if in domain or outside @property def dims ( self ): """ Returns ------- dims : List[srt] output can be ['x'], ['x','y'], or ['x','y','z'] """ return [ "x" , "y" , "z" ][: self . _dims ] [docs] def scale ( self , x : Union [ float , sympy . Basic ], parameterization : Parameterization = Parameterization (), ): """ Scales geometry. Parameters ---------- x : Union[float, sympy.Basic] Scale factor. Can be a sympy expression if parameterizing. parameterization : Parameterization Parameterization if scale factor is parameterized. """ # create scaled sdf function def _scale_sdf ( sdf , dims , x ): if isinstance ( x , ( float , int )): pass elif isinstance ( x , sympy . Basic ): x = _sympy_func_to_func ( x ) else : raise TypeError ( "Scaling by type " + str ( type ( x )) + "is not supported" ) def scale_sdf ( invar , params , compute_sdf_derivatives = False ): # compute scale if needed if isinstance ( x , ( float , int )): computed_scale = x else : computed_scale = x ( params ) # scale input to sdf function scaled_invar = { ** invar } for key in dims : scaled_invar [ key ] = scaled_invar [ key ] / computed_scale # compute sdf computed_sdf = sdf ( scaled_invar , params , compute_sdf_derivatives ) # scale output sdf values if isinstance ( x , ( float , int )): computed_sdf [ "sdf" ] *= x else : computed_sdf [ "sdf" ] *= x ( params ) return computed_sdf return scale_sdf new_sdf = _scale_sdf ( self . sdf , self . dims , x ) # add parameterization new_parameterization = self . parameterization . union ( parameterization ) # scale bounds new_bounds = self . bounds . scale ( x , parameterization ) # scale curves new_curves = [ c . scale ( x , parameterization ) for c in self . curves ] # return scaled geometry return Geometry ( new_curves , new_sdf , len ( self . dims ), new_bounds , new_parameterization , interior_epsilon = self . interior_epsilon , ) [docs] def translate ( self , xyz : List [ Union [ float , sympy . Basic ]], parameterization : Parameterization = Parameterization (), ): """ Translates geometry. Parameters ---------- xyz : List[Union[float, sympy.Basic]] Translation. Can be a sympy expression if parameterizing. parameterization : Parameterization Parameterization if translation is parameterized. """ # create translated sdf function def _translate_sdf ( sdf , dims , xyx ): compiled_xyz = [] for i , x in enumerate ( xyz ): if isinstance ( x , ( float , int )): compiled_xyz . append ( x ) elif isinstance ( x , sympy . Basic ): compiled_xyz . append ( _sympy_func_to_func ( x )) else : raise TypeError ( "Translate by type " + str ( type ( x )) + "is not supported" ) def translate_sdf ( invar , params , compute_sdf_derivatives = False ): # compute translation if needed computed_translation = [] for x in compiled_xyz : if isinstance ( x , ( float , int )): computed_translation . append ( x ) else : computed_translation . append ( x ( params )) # translate input to sdf function translated_invar = { ** invar } for i , key in enumerate ( dims ): translated_invar [ key ] = ( translated_invar [ key ] - computed_translation [ i ] ) # compute sdf computed_sdf = sdf ( translated_invar , params , compute_sdf_derivatives ) return computed_sdf return translate_sdf new_sdf = _translate_sdf ( self . sdf , self . dims , xyz ) # add parameterization new_parameterization = self . parameterization . union ( parameterization ) # translate bounds new_bounds = self . bounds . translate ( xyz , parameterization ) # translate curves new_curves = [ c . translate ( xyz , parameterization ) for c in self . curves ] # return translated geometry return Geometry ( new_curves , new_sdf , len ( self . dims ), new_bounds , new_parameterization , interior_epsilon = self . interior_epsilon , ) [docs] def rotate ( self , angle : Union [ float , sympy . Basic ], axis : str = "z" , center : Union [ None , List [ float ]] = None , parameterization = Parameterization (), ): """ Rotates geometry. Parameters ---------- angle : Union[float, sympy.Basic] Angle of rotate in radians. Can be a sympy expression if parameterizing. axis : str Axis of rotation. Default is `"z"`. center : Union[None, List[Union[float, sympy.Basic]]] = None If given then center the rotation around this point. parameterization : Parameterization Parameterization if translation is parameterized. """ # create rotated sdf function def _rotate_sdf ( sdf , dims , angle , axis , center ): if isinstance ( angle , ( float , int )): pass elif isinstance ( angle , sympy . Basic ): angle = _sympy_func_to_func ( angle ) else : raise TypeError ( "Scaling by type " + str ( type ( angle )) + "is not supported" ) def rotate_sdf ( invar , params , compute_sdf_derivatives = False ): # compute translation if needed if isinstance ( angle , ( float , int )): computed_angle = angle else : computed_angle = angle ( params ) # rotate input to sdf function rotated_invar = { ** invar } if center is not None : for i , key in enumerate ( dims ): rotated_invar [ key ] = rotated_invar [ key ] - center [ i ] _rotated_invar = { ** rotated_invar } rotated_dims = [ key for key in dims if key != axis ] _rotated_invar [ rotated_dims [ 0 ]] = ( np . cos ( computed_angle ) * rotated_invar [ rotated_dims [ 0 ]] + np . sin ( computed_angle ) * rotated_invar [ rotated_dims [ 1 ]] ) _rotated_invar [ rotated_dims [ 1 ]] = ( - np . sin ( computed_angle ) * rotated_invar [ rotated_dims [ 0 ]] + np . cos ( computed_angle ) * rotated_invar [ rotated_dims [ 1 ]] ) if center is not None : for i , key in enumerate ( dims ): _rotated_invar [ key ] = _rotated_invar [ key ] + center [ i ] # compute sdf computed_sdf = sdf ( _rotated_invar , params , compute_sdf_derivatives ) return computed_sdf return rotate_sdf new_sdf = _rotate_sdf ( self . sdf , self . dims , angle , axis , center ) # add parameterization new_parameterization = self . parameterization . union ( parameterization ) # rotate bounds if center is not None : new_bounds = self . bounds . translate ([ - x for x in center ]) new_bounds = new_bounds . rotate ( angle , axis , parameterization ) new_bounds = new_bounds . translate ( center ) else : new_bounds = self . bounds . rotate ( angle , axis , parameterization ) # rotate curves new_curves = [] for c in self . curves : if center is not None : new_c = c . translate ([ - x for x in center ]) new_c = new_c . rotate ( angle , axis , parameterization ) new_c = new_c . translate ( center ) else : new_c = c . rotate ( angle , axis , parameterization ) new_curves . append ( new_c ) # return rotated geometry return Geometry ( new_curves , new_sdf , len ( self . dims ), new_bounds , new_parameterization , interior_epsilon = self . interior_epsilon , ) [docs] def repeat ( self , spacing : float , repeat_lower : List [ int ], repeat_higher : List [ int ], center : Union [ None , List [ float ]] = None , ): """ Finite Repetition of geometry. Parameters ---------- spacing : float Spacing between each repetition. repeat_lower : List[int] How many repetitions going in negative direction. repeat_upper : List[int] How many repetitions going in positive direction. center : Union[None, List[Union[float, sympy.Basic]]] = None If given then center the rotation around this point. """ # create repeated sdf function def _repeat_sdf ( sdf , dims , spacing , repeat_lower , repeat_higher , center ): # TODO make spacing, repeat_lower, and repeat_higher parameterizable def repeat_sdf ( invar , params , compute_sdf_derivatives = False ): # clamp position values clamped_invar = { ** invar } if center is not None : for i , key in enumerate ( dims ): clamped_invar [ key ] = clamped_invar [ key ] - center [ i ] for d , rl , rh in zip ( dims , repeat_lower , repeat_higher ): clamped_invar [ d ] = clamped_invar [ d ] - spacing * np . minimum ( np . maximum ( np . around ( clamped_invar [ d ] / spacing ), rl ), rh ) if center is not None : for i , key in enumerate ( dims ): clamped_invar [ key ] = clamped_invar [ key ] + center [ i ] # compute sdf computed_sdf = sdf ( clamped_invar , params , compute_sdf_derivatives ) return computed_sdf return repeat_sdf new_sdf = _repeat_sdf ( self . sdf , self . dims , spacing , repeat_lower , repeat_higher , center ) # repeat bounds and curves new_bounds = self . bounds . copy () new_curves = [] for t in itertools . product ( * [ list ( range ( rl , rh + 1 )) for rl , rh in zip ( repeat_lower , repeat_higher )] ): new_bounds = new_bounds . union ( self . bounds . translate ([ spacing * a for a in t ]) ) new_curves += [ c . translate ([ spacing * a for a in t ]) for c in self . curves ] # return repeated geometry return Geometry ( new_curves , new_sdf , len ( self . dims ), new_bounds , self . parameterization . copy (), interior_epsilon = self . interior_epsilon , ) def copy(self): return copy.deepcopy(self) def boundary_criteria(self, invar, criteria=None, params={}): # check if moving in or out of normal direction changes SDF invar_normal_plus = {**invar} invar_normal_minus = {**invar} for key in self.dims: invar_normal_plus[key] = ( invar_normal_plus[key] + self.interior_epsilon * invar_normal_plus["normal_" + key] ) invar_normal_minus[key] = ( invar_normal_minus[key] - self.interior_epsilon * invar_normal_minus["normal_" + key] ) sdf_normal_plus = self.sdf( invar_normal_plus, params, compute_sdf_derivatives=False )["sdf"] sdf_normal_minus = self.sdf( invar_normal_minus, params, compute_sdf_derivatives=False )["sdf"] on_boundary = np.greater_equal(0, sdf_normal_plus * sdf_normal_minus) # check if points satisfy the criteria function if criteria is not None: # convert sympy criteria if needed satify_criteria = criteria(invar, params) # update on_boundary on_boundary = np.logical_and(on_boundary, satify_criteria) return on_boundary [docs] def sample_boundary ( self , nr_points : int , criteria : Union [ sympy . Basic , None ] = None , parameterization : Union [ Parameterization , None ] = None , quasirandom : bool = False , ): """ Samples the surface or perimeter of the geometry. Parameters ---------- nr_points : int number of points to sample on boundary. criteria : Union[sympy.Basic, None] Only sample points that satisfy this criteria. parameterization : Union[Parameterization, None], optional If the geometry is parameterized then you can provide ranges for the parameters with this. By default the sampling will be done with the internal parameterization. quasirandom : bool If true then sample the points using the Halton sequences. Default is False. Returns ------- points : Dict[str, np.ndarray] Dictionary contain a point cloud sampled uniformly. For example in 2D it would be ``` points = {'x': np.ndarray (N, 1), 'y': np.ndarray (N, 1), 'normal_x': np.ndarray (N, 1), 'normal_y': np.ndarray (N, 1), 'area': np.ndarray (N, 1)} ``` The `area` value can be used for Monte Carlo integration like the following, `total_area = np.sum(points['area'])` """ # compile criteria from sympy if needed if criteria is not None : if isinstance ( criteria , sympy . Basic ): criteria = _sympy_criteria_to_criteria ( criteria ) elif isinstance ( criteria , Callable ): pass else : raise TypeError ( "criteria type is not supported: " + str ( type ( criteria )) ) # use internal parameterization if not given if parameterization is None : parameterization = self . parameterization elif isinstance ( parameterization , dict ): parameterization = Parameterization ( parameterization ) # create boundary criteria closure def _boundary_criteria ( criteria ): def boundary_criteria ( invar , params ): return self . boundary_criteria ( invar , criteria = criteria , params = params ) return boundary_criteria closed_boundary_criteria = _boundary_criteria ( criteria ) # compute required points on each curve curve_areas = np . array ( [ curve . approx_area ( parameterization , criteria = closed_boundary_criteria ) for curve in self . curves ] ) assert np . sum ( curve_areas ) > 0 , "Geometry has no surface" curve_probabilities = curve_areas / np . linalg . norm ( curve_areas , ord = 1 ) curve_index = np . arange ( len ( self . curves )) points_per_curve = np . random . choice ( curve_index , nr_points , p = curve_probabilities ) points_per_curve , _ = np . histogram ( points_per_curve , np . arange ( len ( self . curves ) + 1 ) - 0.5 ) # continually sample each curve until reached desired number of points list_invar = [] list_params = [] for n , a , curve in zip ( points_per_curve , curve_areas , self . curves ): if n > 0 : i , p = curve . sample ( n , criteria = closed_boundary_criteria , parameterization = parameterization , ) i [ "area" ] = np . full_like ( i [ "area" ], a / n ) list_invar . append ( i ) list_params . append ( p ) invar = _concat_numpy_dict_list ( list_invar ) params = _concat_numpy_dict_list ( list_params ) invar . update ( params ) return invar [docs] def sample_interior ( self , nr_points : int , bounds : Union [ Bounds , None ] = None , criteria : Union [ sympy . Basic , None ] = None , parameterization : Union [ Parameterization , None ] = None , compute_sdf_derivatives : bool = False , quasirandom : bool = False , ): """ Samples the interior of the geometry. Parameters ---------- nr_points : int number of points to sample. bounds : Union[Bounds, None] Bounds to sample points from. For example, `bounds = Bounds({Parameter('x'): (0, 1), Parameter('y'): (0, 1)})`. By default the internal bounds will be used. criteria : Union[sympy.Basic, None] Only sample points that satisfy this criteria. parameterization: Union[Parameterization, None] If the geometry is parameterized then you can provide ranges for the parameters with this. compute_sdf_derivatives : bool Compute sdf derivatives if true. quasirandom : bool If true then sample the points using the Halton sequences. Default is False. Returns ------- points : Dict[str, np.ndarray] Dictionary contain a point cloud sampled uniformly. For example in 2D it would be ``` points = {'x': np.ndarray (N, 1), 'y': np.ndarray (N, 1), 'sdf': np.ndarray (N, 1), 'area': np.ndarray (N, 1)} ``` The `area` value can be used for Monte Carlo integration like the following, `total_area = np.sum(points['area'])` """ # compile criteria from sympy if needed if criteria is not None : if isinstance ( criteria , sympy . Basic ): criteria = _sympy_criteria_to_criteria ( criteria ) elif isinstance ( criteria , Callable ): pass else : raise TypeError ( "criteria type is not supported: " + str ( type ( criteria )) ) # use internal bounds if not given if bounds is None : bounds = self . bounds elif isinstance ( bounds , dict ): bounds = Bounds ( bounds ) # use internal parameterization if not given if parameterization is None : parameterization = self . parameterization elif isinstance ( parameterization , dict ): parameterization = Parameterization ( parameterization ) # continually sample until reached desired number of points invar = {} params = {} total_tried = 0 nr_try = 0 while True : # sample invar and params local_invar = bounds . sample ( nr_points , parameterization , quasirandom ) local_params = parameterization . sample ( nr_points , quasirandom ) # evaluate SDF function on points local_invar . update ( self . sdf ( local_invar , local_params , compute_sdf_derivatives = compute_sdf_derivatives , ) ) # remove points outside of domain criteria_index = np . greater ( local_invar [ "sdf" ], 0 ) if criteria is not None : criteria_index = np . logical_and ( criteria_index , criteria ( local_invar , local_params ) ) for key in local_invar . keys (): local_invar [ key ] = local_invar [ key ][ criteria_index [:, 0 ], :] for key in local_params . keys (): local_params [ key ] = local_params [ key ][ criteria_index [:, 0 ], :] # add sampled points to list for key in local_invar . keys (): if key not in invar . keys (): # TODO this can be condensed invar [ key ] = local_invar [ key ] else : invar [ key ] = np . concatenate ([ invar [ key ], local_invar [ key ]], axis = 0 ) for key in local_params . keys (): if key not in params . keys (): # TODO this can be condensed params [ key ] = local_params [ key ] else : params [ key ] = np . concatenate ( [ params [ key ], local_params [ key ]], axis = 0 ) # check if finished total_sampled = next ( iter ( invar . values ())) . shape [ 0 ] total_tried += nr_points nr_try += 1 if total_sampled >= nr_points : for key , value in invar . items (): invar [ key ] = value [: nr_points ] for key , value in params . items (): params [ key ] = value [: nr_points ] break # report error if could not sample if nr_try > 100 and total_sampled < 1 : raise RuntimeError ( "Could not sample interior of geometry. Check to make sure non-zero volume" ) # compute area value for monte carlo integration volume = ( total_sampled / total_tried ) * bounds . volume ( parameterization ) invar [ "area" ] = np . full_like ( next ( iter ( invar . values ())), volume / nr_points ) # add params to invar invar . update ( params ) return invar @staticmethod def _convert_criteria(criteria): return criteria def __add__(self, other): def _add_sdf(sdf_1, sdf_2, dims): def add_sdf(invar, params, compute_sdf_derivatives=False): computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives) computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives) computed_sdf = {} computed_sdf["sdf"] = np.maximum( computed_sdf_1["sdf"], computed_sdf_2["sdf"] ) if compute_sdf_derivatives: for d in dims: computed_sdf["sdf" + diff_str + d] = np.where( computed_sdf_1["sdf"] > computed_sdf_2["sdf"], computed_sdf_1["sdf" + diff_str + d], computed_sdf_2["sdf" + diff_str + d], ) return computed_sdf return add_sdf new_sdf = _add_sdf(self.sdf, other.sdf, self.dims) new_parameterization = self.parameterization.union(other.parameterization) new_bounds = self.bounds.union(other.bounds) return Geometry( self.curves + other.curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, ) def __sub__(self, other): def _sub_sdf(sdf_1, sdf_2, dims): def sub_sdf(invar, params, compute_sdf_derivatives=False): computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives) computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives) computed_sdf = {} computed_sdf["sdf"] = np.minimum( computed_sdf_1["sdf"], -computed_sdf_2["sdf"] ) if compute_sdf_derivatives: for d in dims: computed_sdf["sdf" + diff_str + d] = np.where( computed_sdf_1["sdf"] < -computed_sdf_2["sdf"], computed_sdf_1["sdf" + diff_str + d], -computed_sdf_2["sdf" + diff_str + d], ) return computed_sdf return sub_sdf new_sdf = _sub_sdf(self.sdf, other.sdf, self.dims) new_parameterization = self.parameterization.union(other.parameterization) new_bounds = self.bounds.union(other.bounds) new_curves = self.curves + [c.invert_normal() for c in other.curves] return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, ) def __invert__(self): def _invert_sdf(sdf, dims): def invert_sdf(invar, params, compute_sdf_derivatives=False): computed_sdf = sdf(invar, params, compute_sdf_derivatives) computed_sdf["sdf"] = -computed_sdf["sdf"] if compute_sdf_derivatives: for d in dims: computed_sdf["sdf" + diff_str + d] = -computed_sdf[ "sdf" + diff_str + d ] return computed_sdf return invert_sdf new_sdf = _invert_sdf(self.sdf, self.dims) new_parameterization = self.parameterization.copy() new_bounds = self.bounds.copy() new_curves = [c.invert_normal() for c in self.curves] return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, ) def __and__(self, other): def _and_sdf(sdf_1, sdf_2, dims): def and_sdf(invar, params, compute_sdf_derivatives=False): computed_sdf_1 = sdf_1(invar, params, compute_sdf_derivatives) computed_sdf_2 = sdf_2(invar, params, compute_sdf_derivatives) computed_sdf = {} computed_sdf["sdf"] = np.minimum( computed_sdf_1["sdf"], computed_sdf_2["sdf"] ) if compute_sdf_derivatives: for d in dims: computed_sdf["sdf" + diff_str + d] = np.where( computed_sdf_1["sdf"] < computed_sdf_2["sdf"], computed_sdf_1["sdf" + diff_str + d], computed_sdf_2["sdf" + diff_str + d], ) return computed_sdf return and_sdf new_sdf = _and_sdf(self.sdf, other.sdf, self.dims) new_parameterization = self.parameterization.union(other.parameterization) new_bounds = self.bounds.union(other.bounds) new_curves = self.curves + other.curves return Geometry( new_curves, new_sdf, len(self.dims), new_bounds, new_parameterization, interior_epsilon=self.interior_epsilon, )