Training Recipe#
In this tutorial, you will learn how to use PhysicsNeMo to set up a model training pipeline. You will go through a basic training workflow, using built-in and custom models, then add logging (console, MLflow, or Weights & Biases) and checkpointing. Finally, you will see how to run training in a distributed fashion and how to run inference with trained models.
Basic Training Workflow#
Let’s get started. For the purposes of this tutorial, we will focus more on the PhysicsNeMo utilities and not the correctness of the problem definition or the results. A typical training workflow requires data, a trainable model, and an optimizer to update the model parameters.
Using Built-in Models#
In this example, you will explore different ways to interact with models in PhysicsNeMo. PhysicsNeMo provides a library of models suitable for Physics-ML applications that you can use directly in your training workflows. In this tutorial, you will see how to use a model in PhysicsNeMo to set up data-driven training. Using the models from PhysicsNeMo enables you to access various other PhysicsNeMo features like optimization and quality-of-life functionalities such as checkpointing and model entrypoints.
Later, you will also see how to customize these models in PhysicsNeMo.
In this example, you will use the Fourier Neural Operator (FNO) model from PhysicsNeMo.
To demonstrate the training using this model, you need some dataset to train the model.
To allow for fast prototyping of models, PhysicsNeMo provides a set of benchmark datasets
that you can use out of the box without the need to set up data-loading pipelines.
In this example, you will use one such datapipe called
Darcy2D, which is a 2D Darcy
problem with a random permeability field. For more information on the FNO model
or 2D-Darcy problem, refer to Li et al., 2020.
Let’s start by importing a few utilities and packages.
import torch
import physicsnemo
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D
from physicsnemo.metrics.general.mse import mse
from physicsnemo.models.fno.fno import FNO
In this example, you want to develop a mapping between the permeability and its subsequent pressure field for a given forcing function. Refer to PhysicsNeMo DataPipes for additional details.
Then, a training loop for this example can be written as follows:
normaliser = { # Dictionary with mean and std of the permeability and darcy fields
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=64, nr_permeability_freq=5, normaliser=normaliser
)
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to("cuda")
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# run for 20 iterations
dataloader = iter(dataloader)
for i in range(20):
batch = next(dataloader)
truth = batch["darcy"]
pred = model(batch["permeability"])
loss = mse(pred, truth)
loss.backward()
optimizer.step()
scheduler.step()
print(f"Iteration: {i}. Loss: {loss.detach().cpu().numpy()}")
That’s it! This shows how to use a model from PhysicsNeMo. Most of the models in PhysicsNeMo are highly configurable, allowing you to use them out-of-the-box for different applications. Refer to PhysicsNeMo Models for a more complete list of available models.
Using Custom Models in PhysicsNeMo#
PhysicsNeMo provides many pre-built optimized models (see the Model Zoo). However, there might be times when the shipped models don’t serve your application. In such cases, you can write your own models and still leverage PhysicsNeMo features such as checkpointing and the model registry.
Converting a PyTorch model to a PhysicsNeMo model is straightforward: inherit from
Module instead of torch.nn.Module. If you
already have an existing PyTorch model and don’t want to modify its source, you can
convert it using from_torch(). For a detailed
walkthrough with examples, refer to the Modules documentation (in particular
Converting PyTorch Models to PhysicsNeMo Models).
Note
For utilizing the checkpointing functionality of PhysicsNeMo, the model instantiation arguments must be JSON serializable.
PhysicsNeMo models are fully interoperable with standard PyTorch models. A similar
process can also be followed to convert a PhysicsNeMo model to a PhysicsNeMo Sym model
using the Arch class from PhysicsNeMo Sym, which provides utilities to go from
tensor data to the dict format that PhysicsNeMo Sym uses.
Distributed Training Workflow#
PhysicsNeMo has several distributed utilities to simplify the implementation of parallel training and make inference scripts easier by providing a unified way to configure and query parameters associated with the distributed environment.
In this example, you will see how to convert your existing workflow to use data parallelism. For a deep-dive on PhysicsNeMo distributed utilities, refer to PhysicsNeMo Distributed.
def main():
# Initialize the DistributedManager. This will automatically
# detect the number of processes the job was launched with and
# set those configuration parameters appropriately.
DistributedManager.initialize()
# Get instance of the DistributedManager
dist = DistributedManager()
normaliser = {
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=64, nr_permeability_freq=5, normaliser=normaliser
)
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to(dist.device)
# Set up DistributedDataParallel if using more than a single process.
if dist.distributed:
ddps = torch.cuda.Stream()
with torch.cuda.stream(ddps):
model = DistributedDataParallel(
model,
device_ids=[
dist.local_rank
], # Set the device_id to be the local rank of this process on this node
output_device=dist.device,
broadcast_buffers=dist.broadcast_buffers,
find_unused_parameters=dist.find_unused_parameters,
)
torch.cuda.current_stream().wait_stream(ddps)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# run for 20 iterations
dataloader = iter(dataloader)
for i in range(20):
batch = next(dataloader)
truth = batch["darcy"]
pred = model(batch["permeability"])
loss = mse(pred, truth)
loss.backward()
optimizer.step()
scheduler.step()
if __name__ == "__main__":
main()
Running Inference on Trained Models#
Running inference on trained models is straightforward. The example below uses
save() to persist a model and
from_checkpoint() to reload it for
inference. For more details on saving and loading models, refer to
Saving and Loading PhysicsNeMo Models. For loading training-loop
checkpoints (including optimizer and scheduler state), see
Loading Checkpoints During Inference below.
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to("cuda")
# Save the checkpoint. For demo, we will just save untrained checkpoint,
# but in typical workflows is saved after model training.
model.save("untrained_checkpoint.mdlus")
# Inference code
# The parameters to instantitate the model will be loaded from the checkpoint
model_inf = physicsnemo.Module.from_checkpoint("untrained_checkpoint.mdlus").to("cuda")
# put the model in evaluation mode
model_inf.eval()
# run inference
with torch.inference_mode():
input = torch.ones(8, 1, 256, 256).to("cuda")
output = model_inf(input)
print(output.shape)
The distributed utilities can also be used during inference for speeding up the inference workflow, but that is out of the scope for this tutorial.
Logging and Checkpointing#
Logging and checkpointing are important components of the model training workflow. They allow you to keep a record of the model hyperparameters and its performance during training.
In this section, you will explore some of the utilities from PhysicsNeMo to simplify this important aspect of model training.
Logging in PhysicsNeMo#
PhysicsNeMo provides utilities to standardize the logs of different training runs. Using the logging utilities from PhysicsNeMo, you have the flexibility of choosing between the good-old console logging to more advanced ML experiment trackers like MLflow and Weights & Biases. You can always implement these loggers yourself, but in this example, you will use the utilities from PhysicsNeMo that will not only simplify this process but also provide a standardized output format. Let’s get started.
Console Logging#
The example below shows a setup using console logging.
import logging
import torch
import physicsnemo
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D
from physicsnemo.distributed import DistributedManager
from physicsnemo.metrics.general.mse import mse
from physicsnemo.models.fno.fno import FNO
from physicsnemo.utils.logging import LaunchLogger, PythonLogger
DistributedManager.initialize()
dist = DistributedManager()
# --- Setup: data, model, optimizer ---
normaliser = {
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=64, nr_permeability_freq=5, normaliser=normaliser
)
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to(dist.device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# --- Logging: console only ---
logger = PythonLogger("main")
LaunchLogger.initialize()
# Standalone scripts (no Hydra): configure logging so INFO is visible on console.
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger.info("Starting Training!")
# --- Training loop ---
num_epochs = 10
iters_per_epoch = 2
dataloader = iter(dataloader)
for i in range(num_epochs):
with LaunchLogger("train", epoch=i) as launchlog:
for _ in range(iters_per_epoch):
batch = next(dataloader)
truth = batch["darcy"]
pred = model(batch["permeability"])
loss = mse(pred, truth)
loss.backward()
optimizer.step()
scheduler.step()
launchlog.log_minibatch({"Loss": loss.detach().cpu().numpy()})
launchlog.log_epoch({"Learning Rate": optimizer.param_groups[0]["lr"]})
logger.info("Finished Training!")
The logger output can be seen below.
Starting Training!
Epoch 0 Metrics: Learning Rate = 7.225e-03, Loss = 8.120e-01
Epoch Execution Time: 7.027e+00s, Time/Iter: 3.514e+03ms
...
Epoch 9 Metrics: Learning Rate = 3.876e-04, Loss = 4.091e-01
Epoch Execution Time: 6.627e+00s, Time/Iter: 3.314e+03ms
Finished Training!
MLflow Logging#
The example below shows a setup using MLflow logging. The only difference from
the previous example is that you will use the
initialize_mlflow() function to
initialize the MLflow client and also set use_mlflow=True when initializing
the LaunchLogger.
import logging
import torch
import physicsnemo
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D
from physicsnemo.distributed import DistributedManager
from physicsnemo.metrics.general.mse import mse
from physicsnemo.models.fno.fno import FNO
from physicsnemo.utils.logging import LaunchLogger, PythonLogger
from physicsnemo.utils.logging.mlflow import initialize_mlflow
DistributedManager.initialize()
dist = DistributedManager()
# --- Setup: data, model, optimizer ---
normaliser = {
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=64, nr_permeability_freq=5, normaliser=normaliser
)
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to(dist.device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# --- Logging: MLflow + console ---
logger = PythonLogger("main")
initialize_mlflow(
experiment_name="PhysicsNeMo Tutorials",
experiment_desc="Simple PhysicsNeMo Tutorials",
run_name="PhysicsNeMo MLFlow Tutorial",
run_desc="PhysicsNeMo Tutorial Training",
user_name="PhysicsNeMo User",
mode="offline",
)
LaunchLogger.initialize(use_mlflow=True)
# Standalone scripts (no Hydra): configure logging so INFO is visible on console.
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger.info("Starting Training!")
# --- Training loop ---
num_epochs = 10
iters_per_epoch = 2
dataloader = iter(dataloader)
for i in range(num_epochs):
with LaunchLogger("train", epoch=i) as launchlog:
for _ in range(iters_per_epoch):
batch = next(dataloader)
truth = batch["darcy"]
pred = model(batch["permeability"])
loss = mse(pred, truth)
loss.backward()
optimizer.step()
scheduler.step()
launchlog.log_minibatch({"Loss": loss.detach().cpu().numpy()})
launchlog.log_epoch({"Learning Rate": optimizer.param_groups[0]["lr"]})
logger.info("Finished Training!")
During the run, you will notice a directory named mlruns_0 created which stores
the MLflow logs. To visualize the logs interactively, run the following:
mlflow ui --backend-store-uri mlruns_0/
And then navigate to localhost:5000 in your favorite browser.
Warning
Currently the MLflow logger will log the output of each processor separately. So in multi-processor runs, you will see multiple directories being created. This is a known issue and will be fixed in future releases.
Weights & Biases Logging#
The example below shows a setup using Weights & Biases logging. The only
difference from the previous example is that you will use the
initialize_wandb() function
to initialize the Weights & Biases logger and also set use_wandb=True when
initializing the LaunchLogger.
import logging
import torch
import physicsnemo
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D
from physicsnemo.distributed import DistributedManager
from physicsnemo.metrics.general.mse import mse
from physicsnemo.models.fno.fno import FNO
from physicsnemo.utils.logging import LaunchLogger, PythonLogger
from physicsnemo.utils.logging.wandb import initialize_wandb
DistributedManager.initialize()
dist = DistributedManager()
# --- Setup: data, model, optimizer ---
normaliser = {
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=64, nr_permeability_freq=5, normaliser=normaliser
)
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to(dist.device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# --- Logging: WandB + console ---
logger = PythonLogger("main")
initialize_wandb(
project="PhysicsNeMo Tutorials",
name="Simple PhysicsNeMo Tutorials",
entity="PhysicsNeMo WandB Tutorial",
mode="offline",
)
LaunchLogger.initialize(use_wandb=True)
# Standalone scripts (no Hydra): configure logging so INFO is visible on console.
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger.info("Starting Training!")
# --- Training loop ---
num_epochs = 10
iters_per_epoch = 2
dataloader = iter(dataloader)
for i in range(num_epochs):
with LaunchLogger("train", epoch=i) as launchlog:
for _ in range(iters_per_epoch):
batch = next(dataloader)
truth = batch["darcy"]
pred = model(batch["permeability"])
loss = mse(pred, truth)
loss.backward()
optimizer.step()
scheduler.step()
launchlog.log_minibatch({"Loss": loss.detach().cpu().numpy()})
launchlog.log_epoch({"Learning Rate": optimizer.param_groups[0]["lr"]})
logger.info("Finished Training!")
During the run, you will notice a directory named wandb created which stores
the W&B logs.
The logger output can also be seen below.
wandb: Tracking run with wandb version 0.25.0
wandb: W&B syncing is set to `offline` in this directory. Run `wandb online` or set WANDB_MODE=online to enable cloud syncing.
...
Starting Training!
Epoch 0 Metrics: Learning Rate = 7.225e-03, Loss = 8.186e-01
...
Finished Training!
To visualize the logs interactively, simply follow the instructions printed in the outputs.
Checkpointing in PhysicsNeMo#
PhysicsNeMo provides easy utilities to save and load the checkpoints of the model, optimizer, scheduler, and scaler during training and inference. Similar to logging, custom implementation can be used, but in this example you will see the utilities from PhysicsNeMo and some of its benefits.
Loading and Saving Checkpoints During Training#
The example below shows how you can save and load a checkpoint during training. The implementation
allows the model training to be resumed from the last saved checkpoint. Here, you will
demonstrate the use of load_checkpoint()
and save_checkpoint().
import logging
import torch
import physicsnemo
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D
from physicsnemo.utils import load_checkpoint, save_checkpoint
from physicsnemo.metrics.general.mse import mse
from physicsnemo.models.fno.fno import FNO
normaliser = {
"permeability": (1.25, 0.75),
"darcy": (4.52e-2, 2.79e-2),
}
dataloader = Darcy2D(
resolution=256, batch_size=64, nr_permeability_freq=5, normaliser=normaliser
)
model = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5,
).to("cuda")
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda=lambda step: 0.85**step
)
# Configure logging so checkpoint load/save messages (from physicsnemo.utils.checkpoint) are visible.
logging.basicConfig(level=logging.INFO, format="%(message)s")
# Load the epoch and optimizer, model and scheduler parameters from the checkpoint if
# it exists. Here we will use the `load_checkpoint` function to load the checkpoint,
# optimizer, and scheduler parameters from the checkpoint.
loaded_epoch = load_checkpoint(
"./checkpoints",
models=model,
optimizer=optimizer,
scheduler=scheduler,
device="cuda",
)
# we will setup the training to run for 20 epochs each epoch running for 5 iterations
# starting with the loaded epoch
dataloader = iter(dataloader)
for i in range(max(1, loaded_epoch), 21):
# this would be iterations through different batches
for _ in range(5):
batch = next(dataloader)
true = batch["darcy"]
pred = model(batch["permeability"])
loss = mse(pred, true)
loss.backward()
optimizer.step()
scheduler.step()
# save checkpoint every 5th epoch
if i % 5 == 0:
save_checkpoint(
"./checkpoints",
models=model,
optimizer=optimizer,
scheduler=scheduler,
epoch=i,
)
The script configures logging so that checkpoint load/save messages from PhysicsNeMo are visible when run standalone. When run (e.g. from a partially trained model), you should see output similar to the following.
>>> python test_scripts/test_basic_checkpointing.py
...
Loaded model state dictionary .../checkpoints/FNO.0.15.mdlus to device cuda
Loaded checkpoint file .../checkpoints/checkpoint.0.15.pt to device cuda
Loaded optimizer state dictionary
Loaded scheduler state dictionary
Saved model state dictionary: .../checkpoints/FNO.0.15.mdlus
Saved training checkpoint: .../checkpoints/checkpoint.0.15.pt
Loading Checkpoints During Inference#
For loading the checkpoint in inference, the process is straightforward, and you can refer to the samples provided in Running Inference on Trained Models and Saving and Loading PhysicsNeMo Models.