PhysicsNeMo Utils#
The PhysicsNeMo Utils module provides a comprehensive set of utilities that support various aspects of scientific computing, machine learning, and physics simulations. These utilities range from optimization helpers and distributed computing tools to specialized functions for weather and climate modeling, and geometry processing. The module is designed to simplify common tasks while maintaining high performance and scalability.
Weather and Climate Utils#
Specialized utilities for weather and climate modeling, including calculations for solar radiation and atmospheric parameters. These utilities are used extensively in weather prediction models.
- physicsnemo.utils.insolation.insolation(
- dates,
- lat,
- lon,
- scale=1.0,
- daily=False,
- enforce_2d=False,
- clip_zero=True,
Calculate the approximate solar insolation for given dates.
For an example reference, see: https://brian-rose.github.io/ClimateLaboratoryBook/courseware/insolation/
- Parameters:
dates (np.ndarray)
dates – 1d array: datetime or Timestamp
lat (np.ndarray) – 1d or 2d array of latitudes
lon (np.ndarray) – 1d or 2d array of longitudes (0-360deg). If 2d, must match the shape of lat.
scale (float, optional) – scaling factor (solar constant)
daily (bool, optional) – if True, return the daily max solar radiation (lat and day of year dependent only)
enforce_2d (bool, optional) – if True and lat/lon are 1-d arrays, turns them into 2d meshes.
clip_zero (bool, optional) – if True, set values below 0 to 0
- Returns:
np.ndarray
- Return type:
insolation (date, lat, lon)
Checkpointing#
- physicsnemo.utils.checkpoint.get_checkpoint_dir(base_dir: str, model_name: str) str[source]#
Get a checkpoint directory based on a given base directory and model name
- Parameters:
base_dir (str) – Path to the base directory where checkpoints are stored
model_name (str, optional) – Name of the model which is generating the checkpoint
- Returns:
Checkpoint directory
- Return type:
str
- physicsnemo.utils.checkpoint.load_checkpoint(
- path: str,
- models: Module | List[Module] | None = None,
- optimizer: optimizer | None = None,
- scheduler: scheduler | None = None,
- scaler: scaler | None = None,
- epoch: int | None = None,
- metadata_dict: Dict[str, Any] | None = {},
- device: str | device = 'cpu',
Checkpoint loading utility
This loader is designed to be used with the save checkpoint utility in PhysicsNeMo Launch. Given a path, this method will try to find a checkpoint and load state dictionaries into the provided training objects.
- Parameters:
path (str) – Path to training checkpoint
models (Union[torch.nn.Module, List[torch.nn.Module], None], optional) – A single or list of PyTorch models, by default None
optimizer (Union[optimizer, None], optional) – Optimizer, by default None
scheduler (Union[scheduler, None], optional) – Learning rate scheduler, by default None
scaler (Union[scaler, None], optional) – AMP grad scaler, by default None
epoch (Union[int, None], optional) – Epoch checkpoint to load. If none is provided this will attempt to load the checkpoint with the largest index, by default None
metadata_dict (Optional[Dict[str, Any]], optional) – Dictionary to store metadata from the checkpoint, by default None
device (Union[str, torch.device], optional) – Target device, by default “cpu”
- Returns:
Loaded epoch
- Return type:
int
Examples
Save and then restore a model, optimizer, and scheduler from a checkpoint:
>>> import tempfile, torch >>> from physicsnemo.utils.checkpoint import save_checkpoint, load_checkpoint >>> from physicsnemo.models.mlp import FullyConnected >>> model = FullyConnected(in_features=32, out_features=64) >>> optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) >>> scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10) >>> with tempfile.TemporaryDirectory() as tmpdir: ... save_checkpoint(tmpdir, models=model, optimizer=optimizer, ... scheduler=scheduler, epoch=1) ... epoch = load_checkpoint(tmpdir, models=model, optimizer=optimizer, ... scheduler=scheduler) ... epoch 1
Load a specific epoch and retrieve saved metadata:
>>> with tempfile.TemporaryDirectory() as tmpdir: ... save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=1, ... metadata={"loss": 0.42, "experiment": "run_01"}) ... save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=2, ... metadata={"loss": 0.31, "experiment": "run_01"}) ... meta = {} ... epoch = load_checkpoint(tmpdir, models=model, optimizer=optimizer, ... epoch=1, metadata_dict=meta) ... epoch 1 >>> meta["loss"] 0.42
- physicsnemo.utils.checkpoint.save_checkpoint(
- path: str,
- models: Module | List[Module] | None = None,
- optimizer: optimizer | None = None,
- scheduler: scheduler | None = None,
- scaler: scaler | None = None,
- epoch: int | None = None,
- metadata: Dict[str, Any] | None = None,
Training checkpoint saving utility.
This function saves training checkpoints to the provided path. Multiple files may be created depending on what is being saved:
Model checkpoints (when
modelsare provided): “{model_name}{model_id}.{model_parallel_rank}.{epoch}.{ext}” where ext is “.mdlus” for instances ofModuleor “.pt” for PyTorch models.Training state (when optimizer/scheduler/scaler are provided): “checkpoint.{model_parallel_rank}.{epoch}.pt”
For both PhysicsNeMo and PyTorch models, the {model_name} is always derived from the model’s class name
model.__class__.__name__. If multiple models share the same {model_name}, they are indexed by {model_id} (e.g., “MyModel0”, “MyModel1”).The function
load_checkpoint()can be used to restore from these files with models that are already instantiated. To load only the model checkpoint (even when the models are not already instantiated), use the methodfrom_checkpoint()to instantiate and load the model from the checkpoint.- Parameters:
path (str) – Path to save the training checkpoint
models (Union[torch.nn.Module, List[torch.nn.Module], None], optional) – A single or list of PyTorch models, by default None
optimizer (Union[optimizer, None], optional) – Optimizer, by default None
scheduler (Union[scheduler, None], optional) – Learning rate scheduler, by default None
scaler (Union[scaler, None], optional) – AMP grad scaler. Will attempt to save on in static capture if none provided, by default None
epoch (Union[int, None], optional) – Epoch checkpoint to load. If none this will save the checkpoint in the next valid index, by default None
metadata (Optional[Dict[str, Any]], optional) – Additional metadata to save, by default None
Examples
Save a model together with optimizer and scheduler state:
>>> import tempfile, os, torch >>> from physicsnemo.utils.checkpoint import save_checkpoint >>> from physicsnemo.models.mlp import FullyConnected >>> model = FullyConnected(in_features=32, out_features=64) >>> optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) >>> scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10) >>> with tempfile.TemporaryDirectory() as tmpdir: ... save_checkpoint(tmpdir, models=model, optimizer=optimizer, ... scheduler=scheduler, epoch=1) ... sorted(f for f in os.listdir(tmpdir)) ['FullyConnected.0.1.mdlus', 'checkpoint.0.1.pt']
Save at multiple epochs with additional metadata:
>>> with tempfile.TemporaryDirectory() as tmpdir: ... save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=1, ... metadata={"loss": 0.42, "experiment": "run_01"}) ... save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=2, ... metadata={"loss": 0.31, "experiment": "run_01"}) ... sorted(f for f in os.listdir(tmpdir)) ['FullyConnected.0.1.mdlus', 'FullyConnected.0.2.mdlus', 'checkpoint.0.1.pt', 'checkpoint.0.2.pt']
Profiling Utils#
Utilities for profiling the performance of a model.
Optimization Utils#
The optimization utilities provide tools for capturing and managing training states, gradients, and optimization processes. These are particularly useful when implementing custom training loops or specialized optimization strategies.
- class physicsnemo.utils.capture.StaticCaptureEvaluateNoGrad(*args, **kwargs)[source]#
Bases:
_StaticCaptureAn performance optimization decorator for PyTorch no grad evaluation.
This class should be initialized as a decorator on a function that computes run the forward pass of the model that does not require gradient calculations. This is the recommended method to use for inference and validation methods.
- Parameters:
model (physicsnemo.core.Module) – PhysicsNeMo Model
logger (Optional[Logger], optional) – PhysicsNeMo Launch Logger, by default None
use_graphs (bool, optional) – Toggle CUDA graphs if supported by model, by default True
use_amp (bool, optional) – Toggle AMP if supported by mode, by default True
cuda_graph_warmup (int, optional) – Number of warmup steps for cuda graphs, by default 11
amp_type (Union[float16, bfloat16], optional) – Auto casting type for AMP, by default torch.float16
label (Optional[str], optional) – Static capture checkpoint label, by default None
- Raises:
ValueError – If the model provided is not a physicsnemo.core.Module. I.e. has no meta data.
Example
>>> # Create model >>> import physicsnemo >>> model = physicsnemo.models.mlp.FullyConnected(2, 64, 2) >>> input = torch.rand(8, 2) >>> # Create evaluate function with optimization wrapper >>> @StaticCaptureEvaluateNoGrad(model=model) ... def eval_step(model, invar): ... predvar = model(invar) ... return predvar ... >>> output = eval_step(model, input) >>> output.size() torch.Size([8, 2])
Note
Capturing multiple cuda graphs in a single program can lead to potential invalid CUDA memory access errors on some systems. Prioritize capturing training graphs when this occurs.
- class physicsnemo.utils.capture.StaticCaptureTraining(*args, **kwargs)[source]#
Bases:
_StaticCaptureA performance optimization decorator for PyTorch training functions.
This class should be initialized as a decorator on a function that computes the forward pass of the neural network and loss function. The user should only call the defind training step function. This will apply optimizations including: AMP and Cuda Graphs.
- Parameters:
model (physicsnemo.core.Module) – PhysicsNeMo Model
optim (torch.optim) – Optimizer
logger (Optional[Logger], optional) – PhysicsNeMo Launch Logger, by default None
use_graphs (bool, optional) – Toggle CUDA graphs if supported by model, by default True
use_amp (bool, optional) – Toggle AMP if supported by mode, by default True
cuda_graph_warmup (int, optional) – Number of warmup steps for cuda graphs, by default 11
amp_type (Union[float16, bfloat16], optional) – Auto casting type for AMP, by default torch.float16
gradient_clip_norm (Optional[float], optional) – Threshold for gradient clipping
label (Optional[str], optional) – Static capture checkpoint label, by default None
- Raises:
ValueError – If the model provided is not a physicsnemo.core.Module. I.e. has no meta data.
Example
>>> # Create model >>> import physicsnemo >>> model = physicsnemo.models.mlp.FullyConnected(2, 64, 2) >>> input = torch.rand(8, 2) >>> output = torch.rand(8, 2) >>> # Create optimizer >>> optim = torch.optim.Adam(model.parameters(), lr=0.001) >>> # Create training step function with optimization wrapper >>> @StaticCaptureTraining(model=model, optim=optim) ... def training_step(model, invar, outvar): ... predvar = model(invar) ... loss = torch.sum(torch.pow(predvar - outvar, 2)) ... return loss ... >>> # Sample training loop >>> for i in range(3): ... loss = training_step(model, input, output) ...
Note
Static captures must be checkpointed when training using the state_dict() if AMP is being used with gradient scaler. By default, this requires static captures to be instantiated in the same order as when they were checkpointed. The label parameter can be used to relax/circumvent this ordering requirement.
Note
Capturing multiple cuda graphs in a single program can lead to potential invalid CUDA memory access errors on some systems. Prioritize capturing training graphs when this occurs.
PhysicsNeMo Logging#
The PhysicsNeMo Logging module provides a comprehensive and flexible logging system for machine learning experiments and physics simulations. It offers multiple logging backends including console output, MLflow, and Weights & Biases (W&B), allowing users to track metrics, artifacts, and experiment parameters across different platforms. The module is designed to work seamlessly in both single-process and distributed training environments.
Key Features: - Unified logging interface across different backends - Support for distributed training environments - Automatic metric aggregation and synchronization - Flexible configuration and customization options - Integration with popular experiment tracking platforms
Consider the following example usage:
from physicsnemo.utils.logging import LaunchLogger
# Initialize the logger
logger = LaunchLogger.initialize(use_mlflow=True)
# Training loop
for epoch in range(num_epochs):
# Training logger
with LaunchLogger(
"train", epoch = epoch, num_mini_batch = len(training_datapipe), epoch_alert_freq = 1
) as logger:
for batch in training_datapipe:
# Training loop
... # training code
logger.log_metrics({"train_loss": training_loss})
# Validation logger
with LaunchLogger(
"val", epoch = epoch, num_mini_batch = len(validation_datapipe), epoch_alert_freq = 1
) as logger:
for batch in validation_datapipe:
# Validation loop
... # validation code
logger.log_minibatch({"val_loss": validation_loss})
learning_rate = ... # get the learning rate at the end of the epoch from the optimizer
logger.log_epoch({"learning_rate": learning_rate}) # log the learning rate at the end of the epoch
This example shows how to use the LaunchLogger to log metrics during training and validation. The LaunchLogger is initialized with the MLflow backend, and the logger is created for each epoch, a separate logger is created for training and validation. You can use the .log_minibatch method to log metrics during training and validation. You can use the .log_epoch method to log the learning rate at the end of the epoch.
For a more detailed example, refer to the Logging and Checkpointing recipe .
Launch Logger#
The LaunchLogger serves as the primary interface for logging in PhysicsNeMo. It provides a unified API that works consistently across different logging backends and training environments. The logger automatically handles metric aggregation in distributed settings and ensures proper synchronization across processes.
- class physicsnemo.utils.logging.launch.LaunchLogger(name_space, *args, **kwargs)[source]#
Bases:
objectPhysicsNeMo Launch logger
An abstracted logger class that takes care of several fundamental logging functions. This class should first be initialized and then used via a context manager. This will auto compute epoch metrics. This is the standard logger for PhysicsNeMo examples.
- Parameters:
name_space (str) – Namespace of logger to use. This will define the loggers title in the console and the wandb group the metric is plotted
epoch (int, optional) – Current epoch, by default 1
num_mini_batch (Union[int, None], optional) – Number of mini-batches used to calculate the epochs progress, by default None
profile (bool, optional) – Profile code using nvtx markers, by default False
mini_batch_log_freq (int, optional) – Frequency to log mini-batch losses, by default 100
epoch_alert_freq (Union[int, None], optional) – Epoch frequency to send training alert, by default None
Example
>>> from physicsnemo.utils.logging import LaunchLogger >>> LaunchLogger.initialize() >>> epochs = 3 >>> for i in range(epochs): ... with LaunchLogger("Train", epoch=i) as log: ... # Log 3 mini-batches manually ... log.log_minibatch({"loss": 1.0}) ... log.log_minibatch({"loss": 2.0}) ... log.log_minibatch({"loss": 3.0})
- static initialize(
- use_wandb: bool = False,
- use_mlflow: bool = False,
Initialize logging singleton
- Parameters:
use_wandb (bool, optional) – Use WandB logging, by default False
use_mlflow (bool, optional) – Use MLFlow logging, by default False
- log_epoch(losses: Dict[str, float])[source]#
Logs metrics for a single epoch
- Parameters:
losses (Dict[str, float]) – Dictionary of metrics/loss values to log
- log_figure(
- figure,
- artifact_file: str = 'artifact',
- plot_dir: str = './',
- log_to_file: bool = False,
Logs figures on root process to wand or mlflow. Will store it to file in case neither are selected.
- Parameters:
figure (Figure) – matplotlib or plotly figure to plot
artifact_file (str, optional) – File name. CAUTION overrides old files of same name
plot_dir (str, optional) – output directory for plot
log_to_file (bool, optional) – set to true in case figure shall be stored to file in addition to logging it to mlflow/wandb
- log_minibatch(losses: Dict[str, float])[source]#
Logs metrics for a mini-batch epoch
This function should be called every mini-batch iteration. It will accumulate loss values over a datapipe. At the end of a epoch the average of these losses from each mini-batch will get calculated.
- Parameters:
losses (Dict[str, float]) – Dictionary of metrics/loss values to log
Console Logger#
A simple but powerful console-based logger that provides formatted output to the terminal. It’s particularly useful during development and debugging, offering clear visibility into training progress and metrics.
MLflow Logger#
Integration with MLflow for experiment tracking and model management. This utility enables systematic tracking of experiments, including metrics, parameters, artifacts, and model versions. It’s particularly useful for teams that need to maintain reproducibility and compare different experiments. Users should initialize the MLflow backend before using the LaunchLogger.
Example usage:
from physicsnemo.utils.logging.mlflow import initialize_mlflow
from physicsnemo.utils.logging import LaunchLogger
# Initialize MLflow
initialize_mlflow(
experiment_name="weather_prediction",
user_name="physicsnemo_user",
mode="offline",
)
# Create logger with MLflow backend
logger = LaunchLogger.initialize(use_mlflow=True)
Weights and Biases Logger#
Integration with Weights & Biases (W&B) for experiment tracking and visualization. This utility provides rich visualization capabilities and easy experiment comparison, making it ideal for projects that require detailed analysis of training runs and model performance. You must initialize the W&B backend before using the LaunchLogger.
Weights and Biases Routines and Utilities
Example usage:
from physicsnemo.utils.logging.wandb import initialize_wandb
from physicsnemo.utils.logging import LaunchLogger
# Initialize W&B
initialize_wandb(
project="physics_simulation",
entity="my_team"
)
# Create logger with W&B backend
logger = LaunchLogger.initialize(use_wandb=True)
Logging Utils#
Utility functions and helpers for logging operations.
- physicsnemo.utils.logging.utils.create_ddp_group_tag(group_name: str = None) str[source]#
Creates a common group tag for logging
For some reason this does not work with multi-node. Seems theres a bug in PyTorch when one uses a distributed util before DDP
- Parameters:
group_name (str, optional) – Optional group name prefix. If None will use
"DDP_Group_", by default None- Returns:
Group tag
- Return type:
str