Core v0.4.0

Source code for modulus.datapipes.benchmarks.kelvin_helmholtz

# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from dataclasses import dataclass
from typing import Dict, Tuple, Union

import numpy as np
import torch
import warp as wp

from ..datapipe import Datapipe
from ..meta import DatapipeMetaData
from .kernels.finite_volume import (
from .kernels.initialization import init_uniform_random_2d

Tensor = torch.Tensor
# TODO unsure if better to remove this

[docs]@dataclass class MetaData(DatapipeMetaData): name: str = "KelvinHelmholtz2D" # Optimization auto_device: bool = True cuda_graphs: bool = True # Parallel ddp_sharding: bool = False
[docs]class KelvinHelmholtz2D(Datapipe): """Kelvin-Helmholtz instability benchmark problem datapipe. This datapipe continuously generates samples with random initial conditions. All samples are generated on the fly and is meant to be a benchmark problem for testing data driven models. Initial conditions are given in the form of small perturbations. The solution is obtained using a GPU enabled Finite Volume Method. Parameters ---------- resolution : int, optional Resolution to run simulation at, by default 512 batch_size : int, optional Batch size of simulations, by default 16 seq_length : int, optional Sequence length of output samples, by default 8 nr_perturbation_freq : int, optional Number of frequencies to use for generating random initial perturbations, by default 5 perturbation_range : float, optional Range to use for random perturbations. This value will be the max amplitude of the initial perturbation, by default 0.1 nr_snapshots : int, optional Number of snapshots of simulation to generate for data generation. This will control how long the simulation is run for, by default 256 iteration_per_snapshot : int, optional Number of finite volume steps to take between each snapshot. Each step size is fixed as the smallest possible value that satisfies the Courant-Friedrichs-Lewy condition, by default 32 gamma : float, optional Heat capacity ratio, by default 5.0/3.0 normaliser : Union[Dict[str, Tuple[float, float]], None], optional Dictionary with keys `density`, `velocity`, and `pressure`. The values for these keys are two floats corresponding to mean and std `(mean, std)`. device : Union[str, torch.device], optional Device for datapipe to run place data on, by default "cuda" """ def __init__( self, resolution: int = 512, batch_size: int = 16, seq_length: int = 8, nr_perturbation_freq: int = 5, perturbation_range: float = 0.1, nr_snapshots: int = 256, iteration_per_snapshot: int = 32, gamma: float = 5.0 / 3.0, normaliser: Union[Dict[str, Tuple[float, float]], None] = None, device: Union[str, torch.device] = "cuda", ): super().__init__(meta=MetaData()) # simulation params self.resolution = resolution self.batch_size = batch_size self.seq_length = seq_length self.nr_perturbation_freq = nr_perturbation_freq self.perturbation_range = perturbation_range self.nr_snapshots = nr_snapshots self.iteration_per_snapshot = iteration_per_snapshot self.gamma = gamma self.courant_fac = 0.4 # hard set self.normaliser = normaliser # check normaliser keys if self.normaliser is not None: if not {"density", "velocity", "pressure"}.issubset( set(self.normaliser.keys()) ): raise ValueError( "normaliser need to have keys `density`, `velocity` and `pressure` with mean and std" ) # Set up device for warp, warp has same naming convention as torch. if isinstance(device, torch.device): device = str(device) self.device = device # spatial dims self.dx = 1.0 / resolution self.dt = ( self.courant_fac * self.dx / (np.sqrt(self.gamma * 5.0) + 2.0) ) # hard set to smallest possible step needed self.vol = self.dx**2 self.dim = (self.batch_size, self.resolution, self.resolution) # allocate array for initial freq perturbation self.w = wp.zeros( (self.batch_size, self.nr_perturbation_freq), dtype=float, device=self.device, ) # allocate conservation quantities self.mass = wp.zeros(self.dim, dtype=float, device=self.device) = wp.zeros(self.dim, dtype=wp.vec2, device=self.device) self.e = wp.zeros(self.dim, dtype=float, device=self.device) # allocate primitive quantities self.rho = wp.zeros(self.dim, dtype=float, device=self.device) self.vel = wp.zeros(self.dim, dtype=wp.vec2, device=self.device) self.p = wp.zeros(self.dim, dtype=float, device=self.device) # allocate flux values for computation self.mass_flux_x = wp.zeros(self.dim, dtype=float, device=self.device) self.mass_flux_y = wp.zeros(self.dim, dtype=float, device=self.device) self.mom_flux_x = wp.zeros(self.dim, dtype=wp.vec2, device=self.device) self.mom_flux_y = wp.zeros(self.dim, dtype=wp.vec2, device=self.device) self.e_flux_x = wp.zeros(self.dim, dtype=float, device=self.device) self.e_flux_y = wp.zeros(self.dim, dtype=float, device=self.device) # allocate extrapolation values for computation self.rho_xl = wp.zeros(self.dim, dtype=float, device=self.device) self.rho_xr = wp.zeros(self.dim, dtype=float, device=self.device) self.rho_yl = wp.zeros(self.dim, dtype=float, device=self.device) self.rho_yr = wp.zeros(self.dim, dtype=float, device=self.device) self.vel_xl = wp.zeros(self.dim, dtype=wp.vec2, device=self.device) self.vel_xr = wp.zeros(self.dim, dtype=wp.vec2, device=self.device) self.vel_yl = wp.zeros(self.dim, dtype=wp.vec2, device=self.device) self.vel_yr = wp.zeros(self.dim, dtype=wp.vec2, device=self.device) self.p_xl = wp.zeros(self.dim, dtype=float, device=self.device) self.p_xr = wp.zeros(self.dim, dtype=float, device=self.device) self.p_yl = wp.zeros(self.dim, dtype=float, device=self.device) self.p_yr = wp.zeros(self.dim, dtype=float, device=self.device) # allocate arrays for storing results self.seq_rho = [ wp.zeros(self.dim, dtype=float, device=self.device) for _ in range(self.nr_snapshots) ] self.seq_vel = [ wp.zeros(self.dim, dtype=wp.vec2, device=self.device) for _ in range(self.nr_snapshots) ] self.seq_p = [ wp.zeros(self.dim, dtype=float, device=self.device) for _ in range(self.nr_snapshots) ] self.output_rho = None self.output_vel = None self.output_p = None
[docs] def initialize_batch(self) -> None: """Initializes arrays for new batch of simulations""" # initialize random Fourier freq seed = np.random.randint(np.iinfo(np.uint64).max, dtype=np.uint64) wp.launch( init_uniform_random_2d, dim=[self.batch_size, self.nr_perturbation_freq], inputs=[self.w, -self.perturbation_range, self.perturbation_range, seed], device=self.device, ) # initialize fields wp.launch( initialize_kelvin_helmoltz_batched_2d, dim=self.dim, inputs=[ self.rho, self.vel, self.p, self.w, 0.05 / np.sqrt(2.0), self.dim[1], self.dim[2], self.nr_perturbation_freq, ], device=self.device, ) wp.launch( euler_primitive_to_conserved_batched_2d, dim=self.dim, inputs=[ self.rho, self.vel, self.p, self.mass,, self.e, self.gamma, self.vol, self.dim[1], self.dim[2], ], device=self.device, )
[docs] def generate_batch(self) -> None: """Solve for new batch of simulations""" # initialize tensors with random coef self.initialize_batch() # run solver for s in range(self.nr_snapshots): # save arrays for wp.copy(self.seq_rho[s], self.rho) wp.copy(self.seq_vel[s], self.vel) wp.copy(self.seq_p[s], self.p) # iterations for i in range(self.iteration_per_snapshot): # compute primitives wp.launch( euler_conserved_to_primitive_batched_2d, dim=self.dim, inputs=[ self.mass,, self.e, self.rho, self.vel, self.p, self.gamma, self.vol, self.dim[1], self.dim[2], ], device=self.device, ) # compute extrapolations to faces wp.launch( euler_extrapolation_batched_2d, dim=self.dim, inputs=[ self.rho, self.vel, self.p, self.rho_xl, self.rho_xr, self.rho_yl, self.rho_yr, self.vel_xl, self.vel_xr, self.vel_yl, self.vel_yr, self.p_xl, self.p_xr, self.p_yl, self.p_yr, self.gamma, self.dx, self.dt, self.dim[1], self.dim[2], ], device=self.device, ) # compute fluxes wp.launch( euler_get_flux_batched_2d, dim=self.dim, inputs=[ self.rho_xl, self.rho_xr, self.rho_yl, self.rho_yr, self.vel_xl, self.vel_xr, self.vel_yl, self.vel_yr, self.p_xl, self.p_xr, self.p_yl, self.p_yr, self.mass_flux_x, self.mass_flux_y, self.mom_flux_x, self.mom_flux_y, self.e_flux_x, self.e_flux_y, self.gamma, self.dim[1], self.dim[2], ], device=self.device, ) # apply fluxes wp.launch( euler_apply_flux_batched_2d, dim=self.dim, inputs=[ self.mass_flux_x, self.mass_flux_y, self.mom_flux_x, self.mom_flux_y, self.e_flux_x, self.e_flux_y, self.mass,, self.e, self.dx, self.dt, self.dim[1], self.dim[2], ], device=self.device, )

def __iter__(self) -> Tuple[Tensor, Tensor, Tensor]: """ Yields ------ Iterator[Tuple[Tensor, Tensor]] Infinite iterator that returns a batch of timeseries with (density, velocity, pressure) fields of size [batch, seq_length, dim, resolution, resolution] """ # infinite generator while True: # run simulation self.generate_batch() # return all samples generated before rerunning simulation batch_ind = [ np.arange(self.nr_snapshots - self.seq_length) for _ in range(self.batch_size) ] for b_ind in batch_ind: np.random.shuffle(b_ind) for bb in range(self.nr_snapshots - self.seq_length): # run over batch to gather samples batched_seq_rho = [] batched_seq_vel = [] batched_seq_p = [] for b in range(self.batch_size): # gather seq from each batch seq_rho = [] seq_vel = [] seq_p = [] for s in range(self.seq_length): # get variables rho = wp.to_torch(self.seq_rho[batch_ind[b][bb] + s])[b] vel = wp.to_torch(self.seq_vel[batch_ind[b][bb] + s])[b] p = wp.to_torch(self.seq_p[batch_ind[b][bb] + s])[b] # add channels rho = torch.unsqueeze(rho, 0) vel = torch.permute(vel, (2, 0, 1)) p = torch.unsqueeze(p, 0) # normalize values if self.normaliser is not None: rho = ( rho - self.normaliser["density"][0] ) / self.normaliser["density"][1] vel = ( vel - self.normaliser["velocity"][0] ) / self.normaliser["velocity"][1] p = (p - self.normaliser["pressure"][0]) / self.normaliser[ "pressure" ][1] # store for producing seq seq_rho.append(rho) seq_vel.append(vel) seq_p.append(p) # concat seq batched_seq_rho.append(torch.stack(seq_rho, axis=0)) batched_seq_vel.append(torch.stack(seq_vel, axis=0)) batched_seq_p.append(torch.stack(seq_p, axis=0)) # CUDA graphs static copies if self.output_rho is None: # concat batches self.output_rho = torch.stack(batched_seq_rho, axis=0) self.output_vel = torch.stack(batched_seq_vel, axis=0) self.output_p = torch.stack(batched_seq_p, axis=0) else:, axis=0)), axis=0)), axis=0)) yield { "density": self.output_rho, "velocity": self.output_vel, "pressure": self.output_p, } def __len__(self): return sys.maxsize

© Copyright 2023, NVIDIA Modulus Team. Last updated on Jan 25, 2024.