PhysicsNeMo Utils#

The PhysicsNeMo Utils module provides a comprehensive set of utilities that support various aspects of scientific computing, machine learning, and physics simulations. These utilities range from optimization helpers and distributed computing tools to specialized functions for weather and climate modeling, and geometry processing. The module is designed to simplify common tasks while maintaining high performance and scalability.

Weather and Climate Utils#

Specialized utilities for weather and climate modeling, including calculations for solar radiation and atmospheric parameters. These utilities are used extensively in weather prediction models.

physicsnemo.utils.insolation.insolation(
dates,
lat,
lon,
scale=1.0,
daily=False,
enforce_2d=False,
clip_zero=True,
)[source]#

Calculate the approximate solar insolation for given dates.

For an example reference, see: https://brian-rose.github.io/ClimateLaboratoryBook/courseware/insolation/

Parameters:
  • dates (np.ndarray)

  • dates – 1d array: datetime or Timestamp

  • lat (np.ndarray) – 1d or 2d array of latitudes

  • lon (np.ndarray) – 1d or 2d array of longitudes (0-360deg). If 2d, must match the shape of lat.

  • scale (float, optional) – scaling factor (solar constant)

  • daily (bool, optional) – if True, return the daily max solar radiation (lat and day of year dependent only)

  • enforce_2d (bool, optional) – if True and lat/lon are 1-d arrays, turns them into 2d meshes.

  • clip_zero (bool, optional) – if True, set values below 0 to 0

Returns:

np.ndarray

Return type:

insolation (date, lat, lon)

Checkpointing#

physicsnemo.utils.checkpoint.get_checkpoint_dir(base_dir: str, model_name: str) str[source]#

Get a checkpoint directory based on a given base directory and model name

Parameters:
  • base_dir (str) – Path to the base directory where checkpoints are stored

  • model_name (str, optional) – Name of the model which is generating the checkpoint

Returns:

Checkpoint directory

Return type:

str

physicsnemo.utils.checkpoint.load_checkpoint(
path: str,
models: Module | List[Module] | None = None,
optimizer: optimizer | None = None,
scheduler: scheduler | None = None,
scaler: scaler | None = None,
epoch: int | None = None,
metadata_dict: Dict[str, Any] | None = {},
device: str | device = 'cpu',
) int[source]#

Checkpoint loading utility

This loader is designed to be used with the save checkpoint utility in PhysicsNeMo Launch. Given a path, this method will try to find a checkpoint and load state dictionaries into the provided training objects.

Parameters:
  • path (str) – Path to training checkpoint

  • models (Union[torch.nn.Module, List[torch.nn.Module], None], optional) – A single or list of PyTorch models, by default None

  • optimizer (Union[optimizer, None], optional) – Optimizer, by default None

  • scheduler (Union[scheduler, None], optional) – Learning rate scheduler, by default None

  • scaler (Union[scaler, None], optional) – AMP grad scaler, by default None

  • epoch (Union[int, None], optional) – Epoch checkpoint to load. If none is provided this will attempt to load the checkpoint with the largest index, by default None

  • metadata_dict (Optional[Dict[str, Any]], optional) – Dictionary to store metadata from the checkpoint, by default None

  • device (Union[str, torch.device], optional) – Target device, by default “cpu”

Returns:

Loaded epoch

Return type:

int

Examples

Save and then restore a model, optimizer, and scheduler from a checkpoint:

>>> import tempfile, torch
>>> from physicsnemo.utils.checkpoint import save_checkpoint, load_checkpoint
>>> from physicsnemo.models.mlp import FullyConnected
>>> model = FullyConnected(in_features=32, out_features=64)
>>> optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
>>> scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10)
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer,
...                     scheduler=scheduler, epoch=1)
...     epoch = load_checkpoint(tmpdir, models=model, optimizer=optimizer,
...                             scheduler=scheduler)
...     epoch
1

Load a specific epoch and retrieve saved metadata:

>>> with tempfile.TemporaryDirectory() as tmpdir:
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=1,
...                     metadata={"loss": 0.42, "experiment": "run_01"})
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=2,
...                     metadata={"loss": 0.31, "experiment": "run_01"})
...     meta = {}
...     epoch = load_checkpoint(tmpdir, models=model, optimizer=optimizer,
...                             epoch=1, metadata_dict=meta)
...     epoch
1
>>> meta["loss"]
0.42
physicsnemo.utils.checkpoint.save_checkpoint(
path: str,
models: Module | List[Module] | None = None,
optimizer: optimizer | None = None,
scheduler: scheduler | None = None,
scaler: scaler | None = None,
epoch: int | None = None,
metadata: Dict[str, Any] | None = None,
) None[source]#

Training checkpoint saving utility.

This function saves training checkpoints to the provided path. Multiple files may be created depending on what is being saved:

  • Model checkpoints (when models are provided): “{model_name}{model_id}.{model_parallel_rank}.{epoch}.{ext}” where ext is “.mdlus” for instances of Module or “.pt” for PyTorch models.

  • Training state (when optimizer/scheduler/scaler are provided): “checkpoint.{model_parallel_rank}.{epoch}.pt”

For both PhysicsNeMo and PyTorch models, the {model_name} is always derived from the model’s class name model.__class__.__name__. If multiple models share the same {model_name}, they are indexed by {model_id} (e.g., “MyModel0”, “MyModel1”).

The function load_checkpoint() can be used to restore from these files with models that are already instantiated. To load only the model checkpoint (even when the models are not already instantiated), use the method from_checkpoint() to instantiate and load the model from the checkpoint.

Parameters:
  • path (str) – Path to save the training checkpoint

  • models (Union[torch.nn.Module, List[torch.nn.Module], None], optional) – A single or list of PyTorch models, by default None

  • optimizer (Union[optimizer, None], optional) – Optimizer, by default None

  • scheduler (Union[scheduler, None], optional) – Learning rate scheduler, by default None

  • scaler (Union[scaler, None], optional) – AMP grad scaler. Will attempt to save on in static capture if none provided, by default None

  • epoch (Union[int, None], optional) – Epoch checkpoint to load. If none this will save the checkpoint in the next valid index, by default None

  • metadata (Optional[Dict[str, Any]], optional) – Additional metadata to save, by default None

Examples

Save a model together with optimizer and scheduler state:

>>> import tempfile, os, torch
>>> from physicsnemo.utils.checkpoint import save_checkpoint
>>> from physicsnemo.models.mlp import FullyConnected
>>> model = FullyConnected(in_features=32, out_features=64)
>>> optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
>>> scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10)
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer,
...                     scheduler=scheduler, epoch=1)
...     sorted(f for f in os.listdir(tmpdir))
['FullyConnected.0.1.mdlus', 'checkpoint.0.1.pt']

Save at multiple epochs with additional metadata:

>>> with tempfile.TemporaryDirectory() as tmpdir:
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=1,
...                     metadata={"loss": 0.42, "experiment": "run_01"})
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=2,
...                     metadata={"loss": 0.31, "experiment": "run_01"})
...     sorted(f for f in os.listdir(tmpdir))
['FullyConnected.0.1.mdlus', 'FullyConnected.0.2.mdlus', 'checkpoint.0.1.pt', 'checkpoint.0.2.pt']

Profiling Utils#

Utilities for profiling the performance of a model.

Optimization Utils#

The optimization utilities provide tools for capturing and managing training states, gradients, and optimization processes. These are particularly useful when implementing custom training loops or specialized optimization strategies.

class physicsnemo.utils.capture.StaticCaptureEvaluateNoGrad(*args, **kwargs)[source]#

Bases: _StaticCapture

An performance optimization decorator for PyTorch no grad evaluation.

This class should be initialized as a decorator on a function that computes run the forward pass of the model that does not require gradient calculations. This is the recommended method to use for inference and validation methods.

Parameters:
  • model (physicsnemo.core.Module) – PhysicsNeMo Model

  • logger (Optional[Logger], optional) – PhysicsNeMo Launch Logger, by default None

  • use_graphs (bool, optional) – Toggle CUDA graphs if supported by model, by default True

  • use_amp (bool, optional) – Toggle AMP if supported by mode, by default True

  • cuda_graph_warmup (int, optional) – Number of warmup steps for cuda graphs, by default 11

  • amp_type (Union[float16, bfloat16], optional) – Auto casting type for AMP, by default torch.float16

  • label (Optional[str], optional) – Static capture checkpoint label, by default None

Raises:

ValueError – If the model provided is not a physicsnemo.core.Module. I.e. has no meta data.

Example

>>> # Create model
>>> import physicsnemo
>>> model = physicsnemo.models.mlp.FullyConnected(2, 64, 2)
>>> input = torch.rand(8, 2)
>>> # Create evaluate function with optimization wrapper
>>> @StaticCaptureEvaluateNoGrad(model=model)
... def eval_step(model, invar):
...     predvar = model(invar)
...     return predvar
...
>>> output = eval_step(model, input)
>>> output.size()
torch.Size([8, 2])

Note

Capturing multiple cuda graphs in a single program can lead to potential invalid CUDA memory access errors on some systems. Prioritize capturing training graphs when this occurs.

class physicsnemo.utils.capture.StaticCaptureTraining(*args, **kwargs)[source]#

Bases: _StaticCapture

A performance optimization decorator for PyTorch training functions.

This class should be initialized as a decorator on a function that computes the forward pass of the neural network and loss function. The user should only call the defind training step function. This will apply optimizations including: AMP and Cuda Graphs.

Parameters:
  • model (physicsnemo.core.Module) – PhysicsNeMo Model

  • optim (torch.optim) – Optimizer

  • logger (Optional[Logger], optional) – PhysicsNeMo Launch Logger, by default None

  • use_graphs (bool, optional) – Toggle CUDA graphs if supported by model, by default True

  • use_amp (bool, optional) – Toggle AMP if supported by mode, by default True

  • cuda_graph_warmup (int, optional) – Number of warmup steps for cuda graphs, by default 11

  • amp_type (Union[float16, bfloat16], optional) – Auto casting type for AMP, by default torch.float16

  • gradient_clip_norm (Optional[float], optional) – Threshold for gradient clipping

  • label (Optional[str], optional) – Static capture checkpoint label, by default None

Raises:

ValueError – If the model provided is not a physicsnemo.core.Module. I.e. has no meta data.

Example

>>> # Create model
>>> import physicsnemo
>>> model = physicsnemo.models.mlp.FullyConnected(2, 64, 2)
>>> input = torch.rand(8, 2)
>>> output = torch.rand(8, 2)
>>> # Create optimizer
>>> optim = torch.optim.Adam(model.parameters(), lr=0.001)
>>> # Create training step function with optimization wrapper
>>> @StaticCaptureTraining(model=model, optim=optim)
... def training_step(model, invar, outvar):
...     predvar = model(invar)
...     loss = torch.sum(torch.pow(predvar - outvar, 2))
...     return loss
...
>>> # Sample training loop
>>> for i in range(3):
...     loss = training_step(model, input, output)
...

Note

Static captures must be checkpointed when training using the state_dict() if AMP is being used with gradient scaler. By default, this requires static captures to be instantiated in the same order as when they were checkpointed. The label parameter can be used to relax/circumvent this ordering requirement.

Note

Capturing multiple cuda graphs in a single program can lead to potential invalid CUDA memory access errors on some systems. Prioritize capturing training graphs when this occurs.

PhysicsNeMo Logging#

The PhysicsNeMo Logging module provides a comprehensive and flexible logging system for machine learning experiments and physics simulations. It offers multiple logging backends including console output, MLflow, and Weights & Biases (W&B), allowing users to track metrics, artifacts, and experiment parameters across different platforms. The module is designed to work seamlessly in both single-process and distributed training environments.

Key Features: - Unified logging interface across different backends - Support for distributed training environments - Automatic metric aggregation and synchronization - Flexible configuration and customization options - Integration with popular experiment tracking platforms

Consider the following example usage:

from physicsnemo.utils.logging import LaunchLogger

# Initialize the logger
logger = LaunchLogger.initialize(use_mlflow=True)

# Training loop
for epoch in range(num_epochs):

    # Training logger
    with LaunchLogger(
        "train", epoch = epoch, num_mini_batch = len(training_datapipe), epoch_alert_freq = 1
    ) as logger:
        for batch in training_datapipe:
            # Training loop
            ... # training code
            logger.log_metrics({"train_loss": training_loss})

    # Validation logger
    with LaunchLogger(
        "val", epoch = epoch, num_mini_batch = len(validation_datapipe), epoch_alert_freq = 1
    ) as logger:
        for batch in validation_datapipe:
            # Validation loop
            ... # validation code
            logger.log_minibatch({"val_loss": validation_loss})

    learning_rate = ... # get the learning rate at the end of the epoch from the optimizer
    logger.log_epoch({"learning_rate": learning_rate}) # log the learning rate at the end of the epoch

This example shows how to use the LaunchLogger to log metrics during training and validation. The LaunchLogger is initialized with the MLflow backend, and the logger is created for each epoch, a separate logger is created for training and validation. You can use the .log_minibatch method to log metrics during training and validation. You can use the .log_epoch method to log the learning rate at the end of the epoch.

For a more detailed example, refer to the Logging and Checkpointing recipe .

Launch Logger#

The LaunchLogger serves as the primary interface for logging in PhysicsNeMo. It provides a unified API that works consistently across different logging backends and training environments. The logger automatically handles metric aggregation in distributed settings and ensures proper synchronization across processes.

class physicsnemo.utils.logging.launch.LaunchLogger(name_space, *args, **kwargs)[source]#

Bases: object

PhysicsNeMo Launch logger

An abstracted logger class that takes care of several fundamental logging functions. This class should first be initialized and then used via a context manager. This will auto compute epoch metrics. This is the standard logger for PhysicsNeMo examples.

Parameters:
  • name_space (str) – Namespace of logger to use. This will define the loggers title in the console and the wandb group the metric is plotted

  • epoch (int, optional) – Current epoch, by default 1

  • num_mini_batch (Union[int, None], optional) – Number of mini-batches used to calculate the epochs progress, by default None

  • profile (bool, optional) – Profile code using nvtx markers, by default False

  • mini_batch_log_freq (int, optional) – Frequency to log mini-batch losses, by default 100

  • epoch_alert_freq (Union[int, None], optional) – Epoch frequency to send training alert, by default None

Example

>>> from physicsnemo.utils.logging import LaunchLogger
>>> LaunchLogger.initialize()
>>> epochs = 3
>>> for i in range(epochs):
...   with LaunchLogger("Train", epoch=i) as log:
...     # Log 3 mini-batches manually
...     log.log_minibatch({"loss": 1.0})
...     log.log_minibatch({"loss": 2.0})
...     log.log_minibatch({"loss": 3.0})
static initialize(
use_wandb: bool = False,
use_mlflow: bool = False,
)[source]#

Initialize logging singleton

Parameters:
  • use_wandb (bool, optional) – Use WandB logging, by default False

  • use_mlflow (bool, optional) – Use MLFlow logging, by default False

log_epoch(losses: Dict[str, float])[source]#

Logs metrics for a single epoch

Parameters:

losses (Dict[str, float]) – Dictionary of metrics/loss values to log

log_figure(
figure,
artifact_file: str = 'artifact',
plot_dir: str = './',
log_to_file: bool = False,
)[source]#

Logs figures on root process to wand or mlflow. Will store it to file in case neither are selected.

Parameters:
  • figure (Figure) – matplotlib or plotly figure to plot

  • artifact_file (str, optional) – File name. CAUTION overrides old files of same name

  • plot_dir (str, optional) – output directory for plot

  • log_to_file (bool, optional) – set to true in case figure shall be stored to file in addition to logging it to mlflow/wandb

log_minibatch(losses: Dict[str, float])[source]#

Logs metrics for a mini-batch epoch

This function should be called every mini-batch iteration. It will accumulate loss values over a datapipe. At the end of a epoch the average of these losses from each mini-batch will get calculated.

Parameters:

losses (Dict[str, float]) – Dictionary of metrics/loss values to log

classmethod toggle_mlflow(value: bool)[source]#

Toggle MLFlow logging

Parameters:

value (bool) – Use MLFlow logging

classmethod toggle_wandb(value: bool)[source]#

Toggle WandB logging

Parameters:

value (bool) – Use WandB logging

Console Logger#

A simple but powerful console-based logger that provides formatted output to the terminal. It’s particularly useful during development and debugging, offering clear visibility into training progress and metrics.

class physicsnemo.utils.logging.console.PythonLogger(name: str = 'launch')[source]#

Bases: object

Simple console logger for DL training This is a WIP

error(message: str)[source]#

Log error

file_logging(file_name: str = 'launch.log')[source]#

Log to file

info(message: str)[source]#

Log info

log(message: str)[source]#

Log message

success(message: str)[source]#

Log success

warning(message: str)[source]#

Log warning

class physicsnemo.utils.logging.console.RankZeroLoggingWrapper(obj, dist)[source]#

Bases: object

Wrapper class to only log from rank 0 process in distributed training.

MLflow Logger#

Integration with MLflow for experiment tracking and model management. This utility enables systematic tracking of experiments, including metrics, parameters, artifacts, and model versions. It’s particularly useful for teams that need to maintain reproducibility and compare different experiments. Users should initialize the MLflow backend before using the LaunchLogger.

Example usage:

from physicsnemo.utils.logging.mlflow import initialize_mlflow
from physicsnemo.utils.logging import LaunchLogger

# Initialize MLflow
initialize_mlflow(
    experiment_name="weather_prediction",
    user_name="physicsnemo_user",
    mode="offline",
)

# Create logger with MLflow backend
logger = LaunchLogger.initialize(use_mlflow=True)

Weights and Biases Logger#

Integration with Weights & Biases (W&B) for experiment tracking and visualization. This utility provides rich visualization capabilities and easy experiment comparison, making it ideal for projects that require detailed analysis of training runs and model performance. You must initialize the W&B backend before using the LaunchLogger.

Weights and Biases Routines and Utilities

Example usage:

from physicsnemo.utils.logging.wandb import initialize_wandb
from physicsnemo.utils.logging import LaunchLogger

# Initialize W&B
initialize_wandb(
    project="physics_simulation",
    entity="my_team"
)

# Create logger with W&B backend
logger = LaunchLogger.initialize(use_wandb=True)

Logging Utils#

Utility functions and helpers for logging operations.

physicsnemo.utils.logging.utils.create_ddp_group_tag(group_name: str = None) str[source]#

Creates a common group tag for logging

For some reason this does not work with multi-node. Seems theres a bug in PyTorch when one uses a distributed util before DDP

Parameters:

group_name (str, optional) – Optional group name prefix. If None will use "DDP_Group_", by default None

Returns:

Group tag

Return type:

str