Training Recipe#
In this tutorial, you will learn how to use utilities from PhysicsNeMo to set up a model training pipeline. After completing the initial setup, you will explore optimizing the training loop and running it in a distributed fashion. You will finish the tutorial with an inference workflow that demonstrates how to use PhysicsNeMo models for inference.
Basic Training Workflow#
Let’s get started. For the purposes of this tutorial, we will focus more on the PhysicsNeMo utilities and not the correctness of the problem definition or the results. A typical training workflow requires data, a trainable model, and an optimizer to update the model parameters.
Using Built-in Models#
In this example, you will explore different ways to interact with models in PhysicsNeMo. PhysicsNeMo provides a library of models suitable for Physics-ML applications that you can use directly in your training workflows. In this tutorial, you will see how to use a model in PhysicsNeMo to set up data-driven training. Using the models from PhysicsNeMo enables you to access various other PhysicsNeMo features like optimization and quality-of-life functionalities such as checkpointing and model entrypoints.
Later, you will also see how to customize these models in PhysicsNeMo.
In this example, you will use the Fourier Neural Operator (FNO) model from PhysicsNeMo. To demonstrate the training using this model, you need some dataset to train the model. To allow for fast prototyping of models, PhysicsNeMo provides a set of benchmark datasets that you can use out of the box without the need to set up data-loading pipelines. In this example, you will use one such datapipe called Darcy2D, which is a 2D Darcy problem with a random permeability field. For more information on the FNO model or 2D-Darcy problem, refer to Li et al., 2020.
Let’s start by importing a few utilities and packages.
import torch
import physicsnemo
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D
from physicsnemo.metrics.general.mse import mse
from physicsnemo.models.fno.fno import FNO
In this example, you want to develop a mapping between the permeability and its subsequent pressure field for a given forcing function. Refer to PhysicsNeMo Datapipes for additional details.
Then, a training loop for this example can be written as follows:
normaliser = { # Dictionary with mean and std of the permeability and darcy fields
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=64, nr_permeability_freq=5, normaliser=normaliser
)
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to("cuda")
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# run for 20 iterations
dataloader = iter(dataloader)
for i in range(20):
batch = next(dataloader)
truth = batch["darcy"]
pred = model(batch["permeability"])
loss = mse(pred, truth)
loss.backward()
optimizer.step()
scheduler.step()
print(f"Iteration: {i}. Loss: {loss.detach().cpu().numpy()}")
That’s it! This shows how to use a model from PhysicsNeMo. Most of the models in PhysicsNeMo are highly configurable, allowing you to use them out-of-the-box for different applications. Refer to PhysicsNeMo Models for a more complete list of available models.
Using Custom Models in PhysicsNeMo#
PhysicsNeMo provides many pre-built optimized models. However, there might be times when the shipped models might not serve your application. In such cases, you can easily write your own models and have them interact with the other PhysicsNeMo utilities and features. PhysicsNeMo uses PyTorch in the backend, and most PhysicsNeMo models are, at the core, PyTorch models. In this section, you will see how to go from a typical PyTorch model to a PhysicsNeMo model.
Let’s get started with the same application of the Darcy problem. Let’s write a simple UNet to solve the problem. A PyTorch model for a UNet can be written as shown below:
import torch.nn as nn
import physicsnemo
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D
from physicsnemo.metrics.general.mse import mse
class UNet(nn.Module):
def __init__(self, in_channels=1, out_channels=1):
super(UNet, self).__init__()
self.enc1 = self.conv_block(in_channels, 64)
self.enc2 = self.conv_block(64, 128)
self.dec1 = self.upconv_block(128, 64)
self.dec2 = self.upconv_block(64, 32)
self.final = nn.Conv2d(32, out_channels, kernel_size=1)
def conv_block(self, in_channels, out_channels):
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, 3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(2),
)
def upconv_block(self, in_channels, out_channels):
return nn.Sequential(
nn.ConvTranspose2d(in_channels, out_channels, 2, stride=2),
nn.Conv2d(out_channels, out_channels, 3, padding=1),
nn.ReLU(inplace=True),
)
def forward(self, x):
x = self.enc1(x)
x = self.enc2(x)
x = self.dec1(x)
x = self.dec2(x)
return self.final(x)
Now, let’s convert this to a PhysicsNeMo model. PhysicsNeMo provides a Module
class that is
designed to be a drop-in replacement for torch.nn.Module
. Along with that, you
need to also pass a MetaData
that captures the optimizations and other features
supported by the model. Using the Module
subclass allows you to use these optimizations,
and other features like checkpointing, from PhysicsNeMo.
Thus, converting a PyTorch model to a PhysicsNeMo model is very simple. For the above model, the diff would look something like below:
- import torch.nn as nn
+ from dataclasses import dataclass
+ from physicsnemo.models.meta import ModelMetaData
+ from physicsnemo.models.module import Module
- class UNet(nn.Module):
+ @dataclass
+ class MetaData(ModelMetaData):
+ name: str = "UNet"
+ # Optimization
+ jit: bool = False
+ cuda_graphs: bool = True
+ amp_cpu: bool = True
+ amp_gpu: bool = True
+
+ class UNet(Module):
def __init__(self, in_channels=1, out_channels=1):
- super(UNet, self).__init__()
+ super(UNet, self).__init__(meta=MetaData())
self.enc1 = self.conv_block(in_channels, 64)
self.enc2 = self.conv_block(64, 128)
With a few changes like this, you can convert a PyTorch model to a PhysicsNeMo model!
Note
The optimizations are not automatically applied. You are responsible for writing the model with the optimizations supported. However, if the model supports the optimization and the same is captured in the MetaData, then the downstream features will work out-of-the-box.
Note
For utilizing the checkpointing functionality of PhysicsNeMo, the model instantiation arguments must be JSON serializable.
You can also use a PhysicsNeMo model as a standard PyTorch model as they are interoperable.
Let’s say you don’t want to make changes to the code, but you have a PyTorch model
already. You can convert it to a PhysicsNeMo model by using the physicsnemo.Module.from_torch
method. This is described in detail in Converting PyTorch Models to PhysicsNeMo Models.
from dataclasses import dataclass
import torch.nn as nn
from physicsnemo.models.meta import ModelMetaData
from physicsnemo.models.module import Module
@dataclass
class MdlsUNetMetaData(ModelMetaData):
name: str = "MdlsUNet"
# Optimization
jit: bool = False
cuda_graphs: bool = True
amp_cpu: bool = True
amp_gpu: bool = True
MdlsUNet = Module.from_torch(UNet, meta=MdlsUNetMetaData)
And just like that, you can use your existing PyTorch model as a PhysicsNeMo model.
A very similar process can be followed to convert a PhysicsNeMo model to a PhysicsNeMo Sym model
so that you can use the constraints and other definitions from the PhysicsNeMo Sym repository.
Here, you will use the Arch
class from PhysicsNeMo Sym that provides utilities and methods
to go from tensor data to a dict format which PhysicsNeMo Sym uses.
from typing import Dict, Optional
from physicsnemo.sym.key import Key
from physicsnemo.sym.models.arch import Arch
class MdlsSymUNet(Arch):
def __init__(
self,
input_keys=[Key("a")],
output_keys=[Key("b")],
in_channels=1,
out_channels=1,
):
super(MdlsSymUNet, self).__init__(
input_keys=input_keys, output_keys=output_keys
)
self.mdls_model = MdlsUNet(in_channels, out_channels) # MdlsUNet defined above
def forward(self, dict_tensor: Dict[str, torch.Tensor]):
x = self.concat_input(
dict_tensor,
self.input_key_dict,
detach_dict=None,
dim=1,
)
out = self.mdls_model(x)
return self.split_output(out, self.output_key_dict, dim=1)
Optimized Training Workflow#
Once you have a model defined in the PhysicsNeMo style, you can use the optimizations
like AMP, CUDA Graphs, and JIT using the physicsnemo.utils.StaticCaptureTraining
decorator.
This decorator will capture the training step function and optimize it for the specified
optimizations.
Note
The StaticCaptureTraining
decorator is still under development and may be
refactored in the future.
import time
from physicsnemo.utils import StaticCaptureTraining
normaliser = {
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=8, nr_permeability_freq=5, normaliser=normaliser
)
model = MdlsUNet().to("cuda")
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# Create training step function with optimization wrapper
# StaticCaptureTraining calls `backward` on the loss and
# `optimizer.step()` so you don't have to do that
# explicitly.
@StaticCaptureTraining(
model=model,
optim=optimizer,
cuda_graph_warmup=11,
)
def training_step(invar, outvar):
predvar = model(invar)
loss = mse(predvar, outvar)
return loss
# run for 20 iterations
dataloader = iter(dataloader)
for i in range(20):
batch = next(dataloader)
truth = batch["darcy"]
input = batch["permeability"]
loss = training_step(input, truth)
scheduler.step()
Distributed Training Workflow#
PhysicsNeMo has several distributed utilities to simplify the implementation of parallel training and make inference scripts easier by providing a unified way to configure and query parameters associated with the distributed environment.
In this example, you will see how to convert your existing workflow to use data parallelism. For a deep-dive on PhysicsNeMo distributed utilities, refer to PhysicsNeMo Distributed.
def main():
# Initialize the DistributedManager. This will automatically
# detect the number of processes the job was launched with and
# set those configuration parameters appropriately.
DistributedManager.initialize()
# Get instance of the DistributedManager
dist = DistributedManager()
normaliser = {
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=64, nr_permeability_freq=5, normaliser=normaliser
)
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to(dist.device)
# Set up DistributedDataParallel if using more than a single process.
if dist.distributed:
ddps = torch.cuda.Stream()
with torch.cuda.stream(ddps):
model = DistributedDataParallel(
model,
device_ids=[
dist.local_rank
], # Set the device_id to be the local rank of this process on this node
output_device=dist.device,
broadcast_buffers=dist.broadcast_buffers,
find_unused_parameters=dist.find_unused_parameters,
)
torch.cuda.current_stream().wait_stream(ddps)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# Create training step function with optimization wrapper
# StaticCaptureTraining calls `backward` on the loss and
# `optimizer.step()` so you don't have to do that
# explicitly.
@StaticCaptureTraining(
model=model,
optim=optimizer,
cuda_graph_warmup=11,
)
def training_step(invar, outvar):
predvar = model(invar)
loss = mse(predvar, outvar)
return loss
# run for 20 iterations
dataloader = iter(dataloader)
for i in range(20):
batch = next(dataloader)
truth = batch["darcy"]
input = batch["permeability"]
loss = training_step(input, truth)
scheduler.step()
if __name__ == "__main__":
main()
Running Inference on Trained Models#
Running inference on trained models is simple! This is shown by the code below.
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to("cuda")
# Save the checkpoint. For demo, we will just save untrained checkpoint,
# but in typical workflows is saved after model training.
model.save("untrained_checkpoint.mdlus")
# Inference code
# The parameters to instantitate the model will be loaded from the checkpoint
model_inf = physicsnemo.Module.from_checkpoint("untrained_checkpoint.mdlus").to("cuda")
# put the model in evaluation mode
model_inf.eval()
# run inference
with torch.inference_mode():
input = torch.ones(8, 1, 256, 256).to("cuda")
output = model_inf(input)
print(output.shape)
The static capture and distributed utilities can also be used during inference for speeding up the inference workflow, but that is out of the scope for this tutorial.