PhysicsNeMo Utils#

The PhysicsNeMo Utils module provides a comprehensive set of utilities that support various aspects of scientific computing, machine learning, and physics simulations. These utilities range from optimization helpers and distributed computing tools to specialized functions for weather and climate modeling, and geometry processing. The module is designed to simplify common tasks while maintaining high performance and scalability.

Weather and Climate Utils#

Specialized utilities for weather and climate modeling, including calculations for solar radiation and atmospheric parameters. These utilities are used extensively in weather prediction models.

physicsnemo.utils.insolation.insolation(
dates,
lat,
lon,
scale=1.0,
daily=False,
enforce_2d=False,
clip_zero=True,
)[source]#

Calculate the approximate solar insolation for given dates.

For an example reference, see: https://brian-rose.github.io/ClimateLaboratoryBook/courseware/insolation/

Parameters:
  • dates (np.ndarray)

  • dates – 1d array: datetime or Timestamp

  • lat (np.ndarray) – 1d or 2d array of latitudes

  • lon (np.ndarray) – 1d or 2d array of longitudes (0-360deg). If 2d, must match the shape of lat.

  • scale (float, optional) – scaling factor (solar constant)

  • daily (bool, optional) – if True, return the daily max solar radiation (lat and day of year dependent only)

  • enforce_2d (bool, optional) – if True and lat/lon are 1-d arrays, turns them into 2d meshes.

  • clip_zero (bool, optional) – if True, set values below 0 to 0

Returns:

np.ndarray

Return type:

insolation (date, lat, lon)

Checkpointing#

Checkpoint utilities for saving and loading training state.

Provides save_checkpoint() and load_checkpoint() for persisting and restoring model weights, optimizer/scheduler/scaler state, and arbitrary metadata. Supports local filesystems and remote stores via fsspec.

When models are wrapped with FSDP or use DTensor/ShardTensor parameters, save_checkpoint() and load_checkpoint() automatically use PyTorch’s distributed checkpoint state-dict APIs to gather and scatter model and optimizer state. In this distributed mode all ranks must call the functions (the collective operations inside the DCP helpers require it), while only rank 0 performs actual file I/O.

physicsnemo.utils.checkpoint.get_checkpoint_dir(
base_dir: Path | str,
model_name: str,
) str[source]#

Build a model-specific checkpoint directory path.

Returns "{base_dir}/checkpoints_{model_name}", handling both local paths and msc:// URIs.

Parameters:
  • base_dir (Path | str) – Root directory under which the checkpoint subdirectory is placed.

  • model_name (str) – Model name used as the directory suffix.

Returns:

Full path to the checkpoint directory.

Return type:

str

physicsnemo.utils.checkpoint.load_checkpoint(
path: Path | str,
models: Module | list[Module] | None = None,
optimizer: Optimizer | None = None,
scheduler: LRScheduler | None = None,
scaler: GradScaler | None = None,
epoch: int | None = None,
metadata_dict: dict[str, Any] | None = None,
device: str | device = 'cpu',
optimizer_model: Module | None = None,
) int[source]#

Load a training checkpoint saved by save_checkpoint().

Scans path for checkpoint files and restores state dictionaries into the provided training objects. Objects that are None are silently skipped.

When any model is FSDP-wrapped or contains DTensor/ShardTensor parameters the function enters distributed mode: all ranks must call it, rank 0 reads files from disk, and model/optimizer state is scattered to all ranks via DCP helpers.

Parameters:
  • path (Path | str) – Directory containing checkpoint files (local path or fsspec URI). If the directory does not exist, the load is skipped and 0 is returned.

  • models (torch.nn.Module | list[torch.nn.Module] | None, optional) – Model(s) whose state_dict should be restored. DDP and torch.compile wrappers are stripped automatically.

  • optimizer (torch.optim.Optimizer | None, optional) – Optimizer whose state_dict should be restored.

  • scheduler (LRScheduler | None, optional) – Learning-rate scheduler whose state_dict should be restored.

  • scaler (GradScaler | None, optional) – AMP gradient scaler whose state_dict should be restored.

  • epoch (int | None, optional) – Specific checkpoint index to load. When None, the checkpoint with the largest index (most recent) is loaded.

  • metadata_dict (dict[str, Any] | None, optional) – If a dict is provided, it is updated in-place with any metadata that was persisted by save_checkpoint().

  • device (str | torch.device, optional) – Device onto which tensors are mapped during loading. By default "cpu".

  • optimizer_model (torch.nn.Module | None, optional) – The model whose parameters the optimizer is tracking. Required by the DCP set_optimizer_state_dict helper when distributed mode is active. When None, the first model in models is used. Ignored when not in distributed mode.

Returns:

The epoch stored in the checkpoint. Returns 0 when:

  • The checkpoint directory does not exist.

  • No training-state file is found inside the directory.

  • The training-state file does not contain an "epoch" key.

Return type:

int

Examples

Save and then restore a model, optimizer, and scheduler from a checkpoint:

>>> import tempfile, torch
>>> from physicsnemo.utils.checkpoint import save_checkpoint, load_checkpoint
>>> from physicsnemo.models.mlp import FullyConnected
>>> model = FullyConnected(in_features=32, out_features=64)
>>> optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
>>> scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10)
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer,
...                     scheduler=scheduler, epoch=1)
...     epoch = load_checkpoint(tmpdir, models=model, optimizer=optimizer,
...                             scheduler=scheduler)
...     epoch
1

Load a specific epoch and retrieve saved metadata:

>>> with tempfile.TemporaryDirectory() as tmpdir:
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=1,
...                     metadata={"loss": 0.42, "experiment": "run_01"})
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=2,
...                     metadata={"loss": 0.31, "experiment": "run_01"})
...     meta = {}
...     epoch = load_checkpoint(tmpdir, models=model, optimizer=optimizer,
...                             epoch=1, metadata_dict=meta)
...     epoch
1
>>> meta["loss"]
0.42
physicsnemo.utils.checkpoint.load_model_weights(
model: Module,
weights_path: str,
device: str | device = 'cpu',
) None[source]#

Load model weights from a single checkpoint file.

Loads a .mdlus (or .pt) file directly into model, handling FSDP and DTensor/ShardTensor distribution automatically. Unlike load_checkpoint() (which expects a checkpoint directory with numbered files), this function accepts a path to a single file.

When the model is FSDP-wrapped or has DTensor parameters this is a collective operation — all ranks must call it. Rank 0 reads the file and state is scattered via DCP helpers.

Parameters:
  • model (torch.nn.Module) – The model to load weights into. May be FSDP-wrapped, contain DTensor/ShardTensor parameters, or be a plain module.

  • weights_path (str) – Path to a .mdlus or .pt checkpoint file (local path or fsspec URI).

  • device (str | torch.device, optional) – Device for torch.load() map_location. By default "cpu".

physicsnemo.utils.checkpoint.save_checkpoint(
path: Path | str,
models: Module | list[Module] | None = None,
optimizer: Optimizer | None = None,
scheduler: LRScheduler | None = None,
scaler: GradScaler | None = None,
epoch: int | None = None,
metadata: dict[str, Any] | None = None,
optimizer_model: Module | None = None,
) None[source]#

Save a training checkpoint to disk (or a remote store).

Up to two categories of files are created inside path:

  • Model weights (when models is provided) - one file per model: {class_name}{id}.{mp_rank}.{epoch}.{ext} where ext is .mdlus for Module instances or .pt for plain PyTorch models. When several models share a class name, a numeric id is appended ("MyModel0", "MyModel1").

  • Training state (when any of optimizer / scheduler / scaler is provided, or _StaticCapture scalers exist): checkpoint.{mp_rank}.{epoch}.pt containing their combined state_dict entries, plus epoch and metadata.

When any model is FSDP-wrapped or contains DTensor/ShardTensor parameters the function enters distributed mode: all ranks must call it, state is gathered via DCP collective helpers, and only rank 0 writes files.

Use load_checkpoint() to restore from these files. To instantiate and load a model in one step (without pre-constructing it), use from_checkpoint().

Parameters:
  • path (Path | str) – Directory in which to store checkpoint files. Created automatically for local paths if it does not exist.

  • models (torch.nn.Module | list[torch.nn.Module] | None, optional) – Model(s) whose weights should be saved.

  • optimizer (torch.optim.Optimizer | None, optional) – Optimizer whose state_dict should be saved.

  • scheduler (LRScheduler | None, optional) – Learning-rate scheduler whose state_dict should be saved.

  • scaler (GradScaler | None, optional) – AMP gradient scaler whose state_dict should be saved. If None but a _StaticCapture scaler exists, that scaler’s state is saved instead.

  • epoch (int | None, optional) – Epoch index to embed in the filename and the checkpoint dict. When None, the next available index is used.

  • metadata (dict[str, Any] | None, optional) – Arbitrary key-value pairs persisted alongside the training state (e.g. best validation loss, MLflow run ID).

  • optimizer_model (torch.nn.Module | None, optional) – The model whose parameters the optimizer is tracking so that parameter unsharding of optimizer state can be performed correctly. Only required when multiple models are provided, and at least one of them is a distributed model (FSDP/ShardTensor). When None, the first model in models is used. Ignored when not in distributed mode.

Examples

Save a model together with optimizer and scheduler state:

>>> import tempfile, os, torch
>>> from physicsnemo.utils.checkpoint import save_checkpoint
>>> from physicsnemo.models.mlp import FullyConnected
>>> model = FullyConnected(in_features=32, out_features=64)
>>> optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
>>> scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10)
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer,
...                     scheduler=scheduler, epoch=1)
...     sorted(f for f in os.listdir(tmpdir))
['FullyConnected.0.1.mdlus', 'checkpoint.0.1.pt']

Save at multiple epochs with additional metadata:

>>> with tempfile.TemporaryDirectory() as tmpdir:
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=1,
...                     metadata={"loss": 0.42, "experiment": "run_01"})
...     save_checkpoint(tmpdir, models=model, optimizer=optimizer, epoch=2,
...                     metadata={"loss": 0.31, "experiment": "run_01"})
...     sorted(f for f in os.listdir(tmpdir))
['FullyConnected.0.1.mdlus', 'FullyConnected.0.2.mdlus', 'checkpoint.0.1.pt', 'checkpoint.0.2.pt']

Profiling Utils#

Utilities for profiling the performance of a model.

Optimization Utils#

The optimization utilities provide tools for capturing and managing training states, gradients, and optimization processes. These are particularly useful when implementing custom training loops or specialized optimization strategies.

class physicsnemo.utils.capture.StaticCaptureEvaluateNoGrad(*args, **kwargs)[source]#

Bases: _StaticCapture

An performance optimization decorator for PyTorch no grad evaluation.

This class should be initialized as a decorator on a function that computes run the forward pass of the model that does not require gradient calculations. This is the recommended method to use for inference and validation methods.

Parameters:
  • model (physicsnemo.core.Module) – PhysicsNeMo Model

  • logger (Optional[Logger], optional) – PhysicsNeMo Launch Logger, by default None

  • use_graphs (bool, optional) – Toggle CUDA graphs if supported by model, by default True

  • use_amp (bool, optional) – Toggle AMP if supported by mode, by default True

  • cuda_graph_warmup (int, optional) – Number of warmup steps for cuda graphs, by default 11

  • amp_type (Union[float16, bfloat16], optional) – Auto casting type for AMP, by default torch.float16

  • label (Optional[str], optional) – Static capture checkpoint label, by default None

Raises:

ValueError – If the model provided is not a physicsnemo.core.Module. I.e. has no meta data.

Example

>>> # Create model
>>> import physicsnemo
>>> model = physicsnemo.models.mlp.FullyConnected(2, 64, 2)
>>> input = torch.rand(8, 2)
>>> # Create evaluate function with optimization wrapper
>>> @StaticCaptureEvaluateNoGrad(model=model)
... def eval_step(model, invar):
...     predvar = model(invar)
...     return predvar
...
>>> output = eval_step(model, input)
>>> output.size()
torch.Size([8, 2])

Note

Capturing multiple cuda graphs in a single program can lead to potential invalid CUDA memory access errors on some systems. Prioritize capturing training graphs when this occurs.

class physicsnemo.utils.capture.StaticCaptureTraining(*args, **kwargs)[source]#

Bases: _StaticCapture

A performance optimization decorator for PyTorch training functions.

This class should be initialized as a decorator on a function that computes the forward pass of the neural network and loss function. The user should only call the defind training step function. This will apply optimizations including: AMP and Cuda Graphs.

Parameters:
  • model (physicsnemo.core.Module) – PhysicsNeMo Model

  • optim (torch.optim) – Optimizer

  • logger (Optional[Logger], optional) – PhysicsNeMo Launch Logger, by default None

  • use_graphs (bool, optional) – Toggle CUDA graphs if supported by model, by default True

  • use_amp (bool, optional) – Toggle AMP if supported by mode, by default True

  • cuda_graph_warmup (int, optional) – Number of warmup steps for cuda graphs, by default 11

  • amp_type (Union[float16, bfloat16], optional) – Auto casting type for AMP, by default torch.float16

  • gradient_clip_norm (Optional[float], optional) – Threshold for gradient clipping

  • label (Optional[str], optional) – Static capture checkpoint label, by default None

Raises:

ValueError – If the model provided is not a physicsnemo.core.Module. I.e. has no meta data.

Example

>>> # Create model
>>> import physicsnemo
>>> model = physicsnemo.models.mlp.FullyConnected(2, 64, 2)
>>> input = torch.rand(8, 2)
>>> output = torch.rand(8, 2)
>>> # Create optimizer
>>> optim = torch.optim.Adam(model.parameters(), lr=0.001)
>>> # Create training step function with optimization wrapper
>>> @StaticCaptureTraining(model=model, optim=optim)
... def training_step(model, invar, outvar):
...     predvar = model(invar)
...     loss = torch.sum(torch.pow(predvar - outvar, 2))
...     return loss
...
>>> # Sample training loop
>>> for i in range(3):
...     loss = training_step(model, input, output)
...

Note

Static captures must be checkpointed when training using the state_dict() if AMP is being used with gradient scaler. By default, this requires static captures to be instantiated in the same order as when they were checkpointed. The label parameter can be used to relax/circumvent this ordering requirement.

Note

Capturing multiple cuda graphs in a single program can lead to potential invalid CUDA memory access errors on some systems. Prioritize capturing training graphs when this occurs.

PhysicsNeMo Logging#

The PhysicsNeMo Logging module provides a comprehensive and flexible logging system for machine learning experiments and physics simulations. It offers multiple logging backends including console output, MLflow, and Weights & Biases (W&B), allowing users to track metrics, artifacts, and experiment parameters across different platforms. The module is designed to work seamlessly in both single-process and distributed training environments.

Key Features: - Unified logging interface across different backends - Support for distributed training environments - Automatic metric aggregation and synchronization - Flexible configuration and customization options - Integration with popular experiment tracking platforms

Consider the following example usage:

from physicsnemo.utils.logging import LaunchLogger

# Initialize the logger
logger = LaunchLogger.initialize(use_mlflow=True)

# Training loop
for epoch in range(num_epochs):

    # Training logger
    with LaunchLogger(
        "train", epoch = epoch, num_mini_batch = len(training_datapipe), epoch_alert_freq = 1
    ) as logger:
        for batch in training_datapipe:
            # Training loop
            ... # training code
            logger.log_metrics({"train_loss": training_loss})

    # Validation logger
    with LaunchLogger(
        "val", epoch = epoch, num_mini_batch = len(validation_datapipe), epoch_alert_freq = 1
    ) as logger:
        for batch in validation_datapipe:
            # Validation loop
            ... # validation code
            logger.log_minibatch({"val_loss": validation_loss})

    learning_rate = ... # get the learning rate at the end of the epoch from the optimizer
    logger.log_epoch({"learning_rate": learning_rate}) # log the learning rate at the end of the epoch

This example shows how to use the LaunchLogger to log metrics during training and validation. The LaunchLogger is initialized with the MLflow backend, and the logger is created for each epoch, a separate logger is created for training and validation. You can use the .log_minibatch method to log metrics during training and validation. You can use the .log_epoch method to log the learning rate at the end of the epoch.

For a more detailed example, refer to the Logging and Checkpointing recipe .

Launch Logger#

The LaunchLogger serves as the primary interface for logging in PhysicsNeMo. It provides a unified API that works consistently across different logging backends and training environments. The logger automatically handles metric aggregation in distributed settings and ensures proper synchronization across processes.

class physicsnemo.utils.logging.launch.LaunchLogger(name_space, *args, **kwargs)[source]#

Bases: object

PhysicsNeMo Launch logger

An abstracted logger class that takes care of several fundamental logging functions. This class should first be initialized and then used via a context manager. This will auto compute epoch metrics. This is the standard logger for PhysicsNeMo examples.

Parameters:
  • name_space (str) – Namespace of logger to use. This will define the loggers title in the console and the wandb group the metric is plotted

  • epoch (int, optional) – Current epoch, by default 1

  • num_mini_batch (Union[int, None], optional) – Number of mini-batches used to calculate the epochs progress, by default None

  • profile (bool, optional) – Profile code using nvtx markers, by default False

  • mini_batch_log_freq (int, optional) – Frequency to log mini-batch losses, by default 100

  • epoch_alert_freq (Union[int, None], optional) – Epoch frequency to send training alert, by default None

Example

>>> from physicsnemo.utils.logging import LaunchLogger
>>> LaunchLogger.initialize()
>>> epochs = 3
>>> for i in range(epochs):
...   with LaunchLogger("Train", epoch=i) as log:
...     # Log 3 mini-batches manually
...     log.log_minibatch({"loss": 1.0})
...     log.log_minibatch({"loss": 2.0})
...     log.log_minibatch({"loss": 3.0})
static initialize(
use_wandb: bool = False,
use_mlflow: bool = False,
)[source]#

Initialize logging singleton

Parameters:
  • use_wandb (bool, optional) – Use WandB logging, by default False

  • use_mlflow (bool, optional) – Use MLFlow logging, by default False

log_epoch(losses: Dict[str, float])[source]#

Logs metrics for a single epoch

Parameters:

losses (Dict[str, float]) – Dictionary of metrics/loss values to log

log_figure(
figure,
artifact_file: str = 'artifact',
plot_dir: str = './',
log_to_file: bool = False,
)[source]#

Logs figures on root process to wand or mlflow. Will store it to file in case neither are selected.

Parameters:
  • figure (Figure) – matplotlib or plotly figure to plot

  • artifact_file (str, optional) – File name. CAUTION overrides old files of same name

  • plot_dir (str, optional) – output directory for plot

  • log_to_file (bool, optional) – set to true in case figure shall be stored to file in addition to logging it to mlflow/wandb

log_minibatch(losses: Dict[str, float])[source]#

Logs metrics for a mini-batch epoch

This function should be called every mini-batch iteration. It will accumulate loss values over a datapipe. At the end of a epoch the average of these losses from each mini-batch will get calculated.

Parameters:

losses (Dict[str, float]) – Dictionary of metrics/loss values to log

classmethod toggle_mlflow(value: bool)[source]#

Toggle MLFlow logging

Parameters:

value (bool) – Use MLFlow logging

classmethod toggle_wandb(value: bool)[source]#

Toggle WandB logging

Parameters:

value (bool) – Use WandB logging

Console Logger#

A simple but powerful console-based logger that provides formatted output to the terminal. It’s particularly useful during development and debugging, offering clear visibility into training progress and metrics.

class physicsnemo.utils.logging.console.PythonLogger(name: str = 'launch')[source]#

Bases: object

Simple console logger for DL training This is a WIP

error(message: str)[source]#

Log error

file_logging(file_name: str = 'launch.log')[source]#

Log to file

info(message: str)[source]#

Log info

log(message: str)[source]#

Log message

success(message: str)[source]#

Log success

warning(message: str)[source]#

Log warning

class physicsnemo.utils.logging.console.RankZeroLoggingWrapper(obj, dist)[source]#

Bases: object

Wrapper class to only log from rank 0 process in distributed training.

MLflow Logger#

Integration with MLflow for experiment tracking and model management. This utility enables systematic tracking of experiments, including metrics, parameters, artifacts, and model versions. It’s particularly useful for teams that need to maintain reproducibility and compare different experiments. Users should initialize the MLflow backend before using the LaunchLogger.

Example usage:

from physicsnemo.utils.logging.mlflow import initialize_mlflow
from physicsnemo.utils.logging import LaunchLogger

# Initialize MLflow
initialize_mlflow(
    experiment_name="weather_prediction",
    user_name="physicsnemo_user",
    mode="offline",
)

# Create logger with MLflow backend
logger = LaunchLogger.initialize(use_mlflow=True)

Weights and Biases Logger#

Integration with Weights & Biases (W&B) for experiment tracking and visualization. This utility provides rich visualization capabilities and easy experiment comparison, making it ideal for projects that require detailed analysis of training runs and model performance. You must initialize the W&B backend before using the LaunchLogger.

Weights and Biases Routines and Utilities

Example usage:

from physicsnemo.utils.logging.wandb import initialize_wandb
from physicsnemo.utils.logging import LaunchLogger

# Initialize W&B
initialize_wandb(
    project="physics_simulation",
    entity="my_team"
)

# Create logger with W&B backend
logger = LaunchLogger.initialize(use_wandb=True)

Logging Utils#

Utility functions and helpers for logging operations.

physicsnemo.utils.logging.utils.create_ddp_group_tag(group_name: str = None) str[source]#

Creates a common group tag for logging

For some reason this does not work with multi-node. Seems theres a bug in PyTorch when one uses a distributed util before DDP

Parameters:

group_name (str, optional) – Optional group name prefix. If None will use "DDP_Group_", by default None

Returns:

Group tag

Return type:

str