nemo_rl.utils.logger#

Module Contents#

Classes#

WandbConfig

TensorboardConfig

GPUMonitoringConfig

LoggerConfig

LoggerInterface

Abstract base class for logger backends.

TensorboardLogger

Tensorboard logger backend.

WandbLogger

Weights & Biases logger backend.

GpuMetricSnapshot

RayGpuMonitorLogger

Monitor GPU utilization across a Ray cluster and log metrics to a parent logger.

Logger

Main logger class that delegates to multiple backend loggers.

Functions#

flatten_dict

Flatten a nested dictionary.

configure_rich_logging

Configure rich logging for more visually appealing log output.

print_message_log_samples

Visualization for message logs and rewards using a more visual approach with emoji indicators and horizontal layout.

get_next_experiment_dir

Create a new experiment directory with an incremented ID.

Data#

API#

nemo_rl.utils.logger._rich_logging_configured#

False

class nemo_rl.utils.logger.WandbConfig[source]#

Bases: typing.TypedDict

project: str#

None

name: str#

None

class nemo_rl.utils.logger.TensorboardConfig[source]#

Bases: typing.TypedDict

log_dir: str#

None

class nemo_rl.utils.logger.GPUMonitoringConfig[source]#

Bases: typing.TypedDict

collection_interval: int | float#

None

flush_interval: int | float#

None

class nemo_rl.utils.logger.LoggerConfig[source]#

Bases: typing.TypedDict

log_dir: str#

None

wandb_enabled: bool#

None

tensorboard_enabled: bool#

None

wandb: nemo_rl.utils.logger.WandbConfig#

None

tensorboard: nemo_rl.utils.logger.TensorboardConfig#

None

monitor_gpus: bool#

None

gpu_monitoring: nemo_rl.utils.logger.GPUMonitoringConfig#

None

class nemo_rl.utils.logger.LoggerInterface[source]#

Bases: abc.ABC

Abstract base class for logger backends.

abstractmethod log_metrics(
metrics: Dict[str, Any],
step: int,
prefix: Optional[str] = '',
step_metric: Optional[str] = None,
) None[source]#

Log a dictionary of metrics.

abstractmethod log_hyperparams(params: Dict[str, Any]) None[source]#

Log dictionary of hyperparameters.

class nemo_rl.utils.logger.TensorboardLogger(
cfg: nemo_rl.utils.logger.TensorboardConfig,
log_dir: Optional[str] = None,
)[source]#

Bases: nemo_rl.utils.logger.LoggerInterface

Tensorboard logger backend.

Initialization

log_metrics(
metrics: Dict[str, Any],
step: int,
prefix: Optional[str] = '',
step_metric: Optional[str] = None,
) None[source]#

Log metrics to Tensorboard.

Parameters:
  • metrics – Dict of metrics to log

  • step – Global step value

  • prefix – Optional prefix for metric names

  • step_metric – Optional step metric name (ignored in TensorBoard)

log_hyperparams(params: Dict[str, Any]) None[source]#

Log hyperparameters to Tensorboard.

Parameters:

params – Dictionary of hyperparameters to log

class nemo_rl.utils.logger.WandbLogger(
cfg: nemo_rl.utils.logger.WandbConfig,
log_dir: Optional[str] = None,
)[source]#

Bases: nemo_rl.utils.logger.LoggerInterface

Weights & Biases logger backend.

Initialization

define_metric(
name: str,
step_metric: Optional[str] = None,
) None[source]#

Define a metric with custom step metric.

Parameters:
  • name – Name of the metric or pattern (e.g. β€˜ray/*’)

  • step_metric – Optional name of the step metric to use

log_metrics(
metrics: Dict[str, Any],
step: int,
prefix: Optional[str] = '',
step_metric: Optional[str] = None,
) None[source]#

Log metrics to wandb.

Parameters:
  • metrics – Dict of metrics to log

  • step – Global step value

  • prefix – Optional prefix for metric names

  • step_metric – Optional name of a field in metrics to use as step instead of the provided step value

log_hyperparams(params: Dict[str, Any]) None[source]#

Log hyperparameters to wandb.

Parameters:

params – Dict of hyperparameters to log

class nemo_rl.utils.logger.GpuMetricSnapshot[source]#

Bases: typing.TypedDict

step: int#

None

metrics: Dict[str, Any]#

None

class nemo_rl.utils.logger.RayGpuMonitorLogger(
collection_interval: int | float,
flush_interval: int | float,
metric_prefix: str,
step_metric: str,
parent_logger: Optional[nemo_rl.utils.logger.Logger] = None,
)[source]#

Monitor GPU utilization across a Ray cluster and log metrics to a parent logger.

Initialization

Initialize the GPU monitor.

Parameters:
  • collection_interval – Interval in seconds to collect GPU metrics

  • flush_interval – Interval in seconds to flush metrics to parent logger

  • step_metric – Name of the field to use as the step metric

  • parent_logger – Logger to receive the collected metrics

start()[source]#

Start the GPU monitoring thread.

stop()[source]#

Stop the GPU monitoring thread.

_collection_loop()[source]#

Main collection loop that runs in a separate thread.

_parse_gpu_metric(
sample: prometheus_client.samples.Sample,
node_idx: int,
) Dict[str, Any][source]#

Parse a GPU metric sample into a standardized format.

Parameters:
  • sample – Prometheus metric sample

  • node_idx – Index of the node

Returns:

Dictionary with metric name and value

_parse_gpu_sku(
sample: prometheus_client.samples.Sample,
node_idx: int,
) Dict[str, str][source]#

Parse a GPU metric sample into a standardized format.

Parameters:
  • sample – Prometheus metric sample

  • node_idx – Index of the node

Returns:

Dictionary with metric name and value

_collect_gpu_sku() Dict[str, str][source]#

Collect GPU SKU from all Ray nodes.

Note: This is an internal API and users are not expected to call this.

Returns:

Dictionary of SKU types on all Ray nodes

_collect_metrics() Dict[str, Any][source]#

Collect GPU metrics from all Ray nodes.

Returns:

Dictionary of collected metrics

_collect(
metrics: bool = False,
sku: bool = False,
) Dict[str, Any][source]#

Collect GPU metrics from all Ray nodes.

Returns:

Dictionary of collected metrics

_fetch_and_parse_metrics(node_idx, metric_address, parser_fn)[source]#

Fetch metrics from a node and parse GPU metrics.

Parameters:
  • node_idx – Index of the node

  • metric_address – Address of the metrics endpoint

Returns:

Dictionary of GPU metrics

flush()[source]#

Flush collected metrics to the parent logger.

class nemo_rl.utils.logger.Logger(cfg: nemo_rl.utils.logger.LoggerConfig)[source]#

Bases: nemo_rl.utils.logger.LoggerInterface

Main logger class that delegates to multiple backend loggers.

Initialization

Initialize the logger.

Parameters:

cfg –

Config dict with the following keys:

  • wandb_enabled

  • tensorboard_enabled

  • wandb

  • tensorboard

  • monitor_gpus

  • gpu_collection_interval

  • gpu_flush_interval

log_metrics(
metrics: Dict[str, Any],
step: int,
prefix: Optional[str] = '',
step_metric: Optional[str] = None,
) None[source]#

Log metrics to all enabled backends.

Parameters:
  • metrics – Dict of metrics to log

  • step – Global step value

  • prefix – Optional prefix for metric names

  • step_metric – Optional name of a field in metrics to use as step instead of the provided step value (currently only needed for wandb)

log_hyperparams(params: Dict[str, Any]) None[source]#

Log hyperparameters to all enabled backends.

Parameters:

params – Dict of hyperparameters to log

log_batched_dict_as_jsonl(
to_log: nemo_rl.distributed.batched_data_dict.BatchedDataDict | Dict[str, Any],
filename: str,
) None[source]#

Log a list of dictionaries to a JSONL file.

Parameters:
  • to_log – BatchedDataDict to log

  • filename – Filename to log to (within the log directory)

__del__()[source]#

Clean up resources when the logger is destroyed.

nemo_rl.utils.logger.flatten_dict(
d: Dict[str, Any],
sep: str = '.',
) Dict[str, Any][source]#

Flatten a nested dictionary.

Handles nested dictionaries and lists by creating keys with separators. For lists, the index is used as part of the key.

Parameters:
  • d – Dictionary to flatten

  • sep – Separator to use between nested keys

Returns:

Flattened dictionary with compound keys

.. rubric:: Examples

>>> from nemo_rl.utils.logger import flatten_dict
>>> flatten_dict({"a": 1, "b": {"c": 2}})
{'a': 1, 'b.c': 2}

>>> flatten_dict({"a": [1, 2], "b": {"c": [3, 4]}})
{'a.0': 1, 'a.1': 2, 'b.c.0': 3, 'b.c.1': 4}

>>> flatten_dict({"a": [{"b": 1}, {"c": 2}]})
{'a.0.b': 1, 'a.1.c': 2}
nemo_rl.utils.logger.configure_rich_logging(
level: str = 'INFO',
show_time: bool = True,
show_path: bool = True,
) None[source]#

Configure rich logging for more visually appealing log output.

Parameters:
  • level – The logging level to use

  • show_time – Whether to show timestamps in logs

  • show_path – Whether to show file paths in logs

nemo_rl.utils.logger.print_message_log_samples(
message_logs: List[nemo_rl.data.interfaces.LLMMessageLogType],
rewards: List[float],
num_samples: int = 5,
step: int = 0,
) None[source]#

Visualization for message logs and rewards using a more visual approach with emoji indicators and horizontal layout.

Parameters:
  • message_logs – List of message logs to sample from

  • rewards – List of rewards corresponding to each message log

  • num_samples – Number of samples to display (default: 5)

  • step – Current training step (for display purposes)

nemo_rl.utils.logger.get_next_experiment_dir(base_log_dir)[source]#

Create a new experiment directory with an incremented ID.

Parameters:

base_log_dir (str) – The base log directory path

Returns:

Path to the new experiment directory with incremented ID

Return type:

str