Source code for modulus.launch.logging.launch

# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import sys
import time
from os import getcwd, makedirs
from os.path import abspath, exists, join
from typing import Dict, Tuple, Union

import torch
import torch.cuda.profiler as profiler
import wandb

from modulus.distributed import DistributedManager, reduce_loss

from .console import PythonLogger
from .wandb import alert

[docs]class LaunchLogger(object): """Modulus Launch logger An abstracted logger class that takes care of several fundamental logging functions. This class should first be initialized and then used via a context manager. This will auto compute epoch metrics. This is the standard logger for Modulus examples. Parameters ---------- name_space : str Namespace of logger to use. This will define the loggers title in the console and the wandb group the metric is plotted epoch : int, optional Current epoch, by default 1 num_mini_batch : Union[int, None], optional Number of mini-batches used to calculate the epochs progress, by default None profile : bool, optional Profile code using nvtx markers, by default False mini_batch_log_freq : int, optional Frequency to log mini-batch losses, by default 100 epoch_alert_freq : Union[int, None], optional Epoch frequency to send training alert, by default None Example ------- >>> from modulus.launch.logging import LaunchLogger >>> LaunchLogger.initialize() >>> epochs = 3 >>> for i in range(epochs): ... with LaunchLogger("Train", epoch=i) as log: ... # Log 3 mini-batches manually ... log.log_minibatch({"loss": 1.0}) ... log.log_minibatch({"loss": 2.0}) ... log.log_minibatch({"loss": 3.0}) """ _instances = {} console_backend = True wandb_backend = False mlflow_backend = False tensorboard_backend = False enable_profiling = False mlflow_run = None mlflow_client = None def __new__(cls, name_space, *args, **kwargs): # If namespace already has an instance just return that if name_space in cls._instances: return cls._instances[name_space] # Otherwise create new singleton instance for this namespace self = super().__new__(cls) # don't pass remaining parameters to object.__new__ cls._instances[name_space] = self # Constructor set up to only be ran once by a logger self.pyLogger = PythonLogger(name_space) self.total_iteration_index = None # Distributed self.root = True if DistributedManager.is_initialized(): self.root = DistributedManager().rank == 0 # Profiler utils if torch.cuda.is_available(): self.profiler = torch.autograd.profiler.emit_nvtx( enabled=cls.enable_profiling ) self.start_event = torch.cuda.Event(enable_timing=True) self.end_event = torch.cuda.Event(enable_timing=True) else: self.profiler = None return self def __init__( self, name_space: str, epoch: int = 1, num_mini_batch: Union[int, None] = None, profile: bool = False, mini_batch_log_freq: int = 100, epoch_alert_freq: Union[int, None] = None, ): self.name_space = name_space self.mini_batch_index = 0 self.minibatch_losses = {} self.epoch_losses = {} self.mini_batch_log_freq = mini_batch_log_freq self.epoch_alert_freq = epoch_alert_freq self.epoch = epoch self.num_mini_batch = num_mini_batch self.profile = profile # Init initial iteration based on current epoch if self.total_iteration_index is None: if num_mini_batch is not None: self.total_iteration_index = (epoch - 1) * num_mini_batch else: self.total_iteration_index = 0 # Set x axis metric to epoch for this namespace if self.wandb_backend: wandb.define_metric(name_space + "/mini_batch_*", step_metric="iter") wandb.define_metric(name_space + "/*", step_metric="epoch")
[docs] def log_minibatch(self, losses: Dict[str, float]): """Logs metrics for a mini-batch epoch This function should be called every mini-batch iteration. It will accumulate loss values over a datapipe. At the end of a epoch the average of these losses from each mini-batch will get calculated. Parameters ---------- losses : Dict[str, float] Dictionary of metrics/loss values to log """ self.mini_batch_index += 1 self.total_iteration_index += 1 for name, value in losses.items(): if name not in self.minibatch_losses: self.minibatch_losses[name] = 0 self.minibatch_losses[name] += value # Log of mini-batch loss if self.mini_batch_index % self.mini_batch_log_freq == 0: # Backend Logging mini_batch_metrics = {} for name, value in losses.items(): mini_batch_metrics[f"{self.name_space}/mini_batch_{name}"] = value self._log_backends( mini_batch_metrics, step=("iter", self.total_iteration_index) ) # Console if self.root: message = "Mini-Batch Losses:" for name, value in losses.items(): message += f" {name} = {value:10.3e}," message = message[:-1] # If we have datapipe length we can get a percent complete if self.num_mini_batch: mbp = 100 * (float(self.mini_batch_index) / self.num_mini_batch) message = f"[{mbp:.02f}%] " + message self.pyLogger.log(message)
[docs] def log_epoch(self, losses: Dict[str, float]): """Logs metrics for a single epoch Parameters ---------- losses : Dict[str, float] Dictionary of metrics/loss values to log """ for name, value in losses.items(): self.epoch_losses[name] = value

def __enter__(self): self.mini_batch_index = 0 self.minibatch_losses = {} self.epoch_losses = {} # Trigger profiling if self.profile and self.profiler: self.logger.warning(f"Starting profile for epoch {self.epoch}") self.profiler.__enter__() profiler.start() # Timing stuff if torch.cuda.is_available(): self.start_event.record() else: self.start_event = time.time() if self.mlflow_backend: self.mlflow_client.update_run(, "RUNNING") return self def __exit__(self, exc_type, exc_value, exc_tb): # Abnormal exit dont log if exc_type is not None: if self.mlflow_backend: self.mlflow_client.set_terminated(, status="KILLED" ) return # Reduce mini-batch losses for name, value in self.minibatch_losses.items(): process_loss = value / self.mini_batch_index self.epoch_losses[name] = process_loss # Compute global loss if DistributedManager.is_initialized() and DistributedManager().distributed: self.epoch_losses[name] = reduce_loss(process_loss) if self.root: # Console printing # TODO: add out of total epochs progress message = f"Epoch {self.epoch} Metrics:" for name, value in self.epoch_losses.items(): message += f" {name} = {value:10.3e}," message = message[:-1] metrics = { f"{self.name_space}/{key}": value for key, value in self.epoch_losses.items() } # Exit profiling if self.profile and self.profiler: self.logger.warning("Ending profile") self.profiler.__exit__() profiler.end() # Timing stuff, TODO: histograms not line plots if torch.cuda.is_available(): self.end_event.record() torch.cuda.synchronize() # Returns milliseconds # epoch_time = self.start_event.elapsed_time(self.end_event) / 1000.0 else: end_event = time.time() epoch_time = end_event - self.start_event # Return MS for time / iter time_per_iter = 1000 * epoch_time / max([1, self.mini_batch_index]) if self.root: message = f"Epoch Execution Time: {epoch_time:10.3e}s" message += f", Time/Iter: {time_per_iter:10.3e}ms" metrics[f"{self.name_space}/Epoch Time (s)"] = epoch_time metrics[f"{self.name_space}/Time per iter (ms)"] = time_per_iter self._log_backends(metrics, step=("epoch", self.epoch)) # TODO this should be in some on delete method / clean up if self.mlflow_backend: self.mlflow_client.set_terminated(, status="FINISHED" ) # Alert if ( self.epoch_alert_freq and self.root and self.epoch % self.epoch_alert_freq == 0 ): if self.wandb_backend: # TODO: Make this a little more informative? alert( title=f"{sys.argv[0]} training progress report", text=f"Run {} is at epoch {self.epoch}.", ) def _log_backends( self, metric_dict: Dict[str, float], step: Tuple[str, int] = None, ): """Logs a dictionary of metrics to different supported backends Parameters ---------- metric_dict : Dict[str, float] Metric dictionary step : Tuple[str, int], optional Tuple containing (step name, step index), by default None print : bool, optional Print metrics, by default False """ # MLFlow Logging if self.mlflow_backend: for key, value in metric_dict.items(): # If value is None just skip if value is None: continue # Keys only allow alpha numeric, ., -, /, _ and spaces key = re.sub("[^a-zA-Z0-9\.\-\s\/\_]+", "", key) self.mlflow_client.log_metric(, key, value, step=step[1] ) # WandB Logging if self.wandb_backend: # For WandB send step in as a metric # Step argument in lod function does not work with multiple log calls at # different intervals metric_dict[step[0]] = step[1] wandb.log(metric_dict)

[docs] def log_figure( self, figure, artifact_file: str = "artifact", plot_dir: str = "./", log_to_file: bool = False, ): """Logs figures on root process to wand or mlflow. Will store it to file in case neither are selected. Parameters ---------- figure : Figure matplotlib or plotly figure to plot artifact_file : str, optional File name. CAUTION overrides old files of same name plot_dir : str, optional output directory for plot log_to_file : bool, optional set to true in case figure shall be stored to file in addition to logging it to mlflow/wandb """ dist = DistributedManager() if dist.rank != 0: return if self.wandb_backend: wandb.log({artifact_file: figure}) if self.mlflow_backend: self.mlflow_client.log_figure( figure=figure, artifact_file=artifact_file,, ) if (not self.wandb_backend) and (not self.mlflow_backend): log_to_file = True if log_to_file: plot_dir = abspath(join(getcwd(), plot_dir)) if not exists(plot_dir): makedirs(plot_dir) if not artifact_file.endswith(".png"): artifact_file += ".png" figure.savefig(join(plot_dir, artifact_file))
[docs] @classmethod def toggle_wandb(cls, value: bool): """Toggle WandB logging Parameters ---------- value : bool Use WandB logging """ cls.wandb_backend = value
[docs] @classmethod def toggle_mlflow(cls, value: bool): """Toggle MLFlow logging Parameters ---------- value : bool Use MLFlow logging """ cls.mlflow_backend = value
[docs] @staticmethod def initialize(use_wandb: bool = False, use_mlflow: bool = False): """Initialize logging singleton Parameters ---------- use_wandb : bool, optional Use WandB logging, by default False use_mlflow : bool, optional Use MLFlow logging, by default False """ if is None and use_wandb: PythonLogger().warning("WandB not initialized, turning off") use_wandb = False if use_wandb: LaunchLogger.toggle_wandb(True) wandb.define_metric("epoch") wandb.define_metric("iter") # let only root process log to mlflow if DistributedManager.is_initialized(): if DistributedManager().rank != 0: return if LaunchLogger.mlflow_run is None and use_mlflow: PythonLogger().warning("MLFlow not initialized, turning off") use_mlflow = False if use_mlflow: LaunchLogger.toggle_mlflow(True)
© Copyright 2023, NVIDIA Modulus Team. Last updated on Jan 25, 2024.