core.transformer.moe.moe_logging#

MoE metrics tracking and logging.

Collects per-layer MoE metrics during forward passes, synchronizes them across distributed ranks, and writes scalar summaries to TensorBoard / W&B.

Usage: tracker = get_moe_metrics_tracker()

# In router forward pass:
tracker.record("load_balancing_loss", loss, layer_number=1, num_layers=32,
               reduce_group=tp_cp_group)

# At end of training step:
log_str = tracker.report(
    loss_scale=1 / num_microbatches,
    iteration=step,
    writer=tb_writer,
    num_layers=32,
)

Module Contents#

Classes#

MetricEntry

Per-layer metric with distributed reduction configuration.

MoEMetricsTracker

Tracker for MoE layer-wise metrics.

Functions#

get_moe_metrics_tracker

Return the global MoE metrics tracker, creating it lazily if needed.

set_moe_metrics_tracker

Set the global MoE metrics tracker.

destroy_moe_metrics_tracker

Reset the global MoE metrics tracker to None.

Data#

API#

class core.transformer.moe.moe_logging.MetricEntry#

Per-layer metric with distributed reduction configuration.

values: torch.Tensor#

None

reduce_group: Optional[torch.distributed.ProcessGroup]#

None

avg_group: Optional[torch.distributed.ProcessGroup]#

None

needs_dp_avg: bool#

True

core.transformer.moe.moe_logging._MOE_METRICS_TRACKER: Optional[core.transformer.moe.moe_logging.MoEMetricsTracker]#

None

core.transformer.moe.moe_logging.get_moe_metrics_tracker() MoEMetricsTracker#

Return the global MoE metrics tracker, creating it lazily if needed.

core.transformer.moe.moe_logging.set_moe_metrics_tracker(tracker: MoEMetricsTracker) None#

Set the global MoE metrics tracker.

core.transformer.moe.moe_logging.destroy_moe_metrics_tracker() None#

Reset the global MoE metrics tracker to None.

class core.transformer.moe.moe_logging.MoEMetricsTracker#

Tracker for MoE layer-wise metrics.

Lifecycle: record() per-layer values during forward → report() at step end (sync, aggregate, log, clear) → repeat.

.. rubric:: Example

tracker = get_moe_metrics_tracker() tracker.record(“load_balancing_loss”, loss, layer_number=1, num_layers=32) log_str = tracker.report(loss_scale=1/8, iteration=100, writer=tb_writer, num_layers=32)

Initialization

property metrics: Dict[str, core.transformer.moe.moe_logging.MetricEntry]#

Read-only access to the underlying metric entries.

record(
name: str,
value: torch.Tensor,
layer_number: int,
num_layers: int,
reduce_group: Optional[torch.distributed.ProcessGroup] = None,
avg_group: Optional[torch.distributed.ProcessGroup] = None,
needs_dp_avg: bool = True,
) None#

Accumulate a metric value for a specific layer.

Called during the router forward pass. Lazily creates the metric entry on first call for each metric name.

Parameters:
  • name – Metric name (e.g. "load_balancing_loss").

  • value – Scalar tensor to accumulate (will be detached).

  • layer_number – 1-based layer index.

  • num_layers – Total number of layers (determines tensor size).

  • reduce_group – Process group for sum-reduction (e.g. tp_cp_group).

  • avg_group – Process group for average-reduction.

  • needs_dp_avg – Whether to average across DP ranks after other reductions.

report(
loss_scale: float,
iteration: int,
writer=None,
wandb_writer=None,
per_layer_logging: bool = False,
force_initialize: bool = False,
track_names: Optional[Union[str, List[str]]] = None,
num_layers: Optional[int] = None,
moe_layer_freq: Optional[Union[int, List[int]]] = None,
mtp_num_layers: Optional[int] = None,
total_loss_dict: Optional[dict[str, torch.Tensor]] = None,
percentiles: Optional[Dict[str, List[float]]] = None,
pg_collection: Optional[megatron.core.process_groups_config.ProcessGroupCollection] = None,
) str#

Sync metrics across ranks, aggregate, log, and clear.

This is the main entry point called once per training step. It pairs with :meth:record: you record individual data points during forward, then report the summary at step end.

Parameters:
  • loss_scale – Scale factor for averaging across microbatches (usually 1 / num_microbatches).

  • iteration – Current training iteration.

  • writer – TensorBoard SummaryWriter (optional).

  • wandb_writer – Weights & Biases run object (optional).

  • per_layer_logging – Whether to also write per-layer values.

  • force_initialize – If True, pre-create metric entries for track_names that don’t exist yet. Required for PP ranks without MoE layers whose tensor sizes must match ranks that do have MoE layers.

  • track_names – Metric name(s) to report. None reports all.

  • num_layers – Total transformer layers (required when force_initialize).

  • moe_layer_freq – MoE layer frequency or binary pattern list.

  • mtp_num_layers – Extra layers from Multi-Token Prediction.

  • total_loss_dict – Megatron training-loop accumulator. Metrics ending with "loss" are accumulated here and excluded from the returned console log string.

  • percentiles – Per-metric percentiles to compute, e.g. {"load_imbalance": [0.5, 0.95]}.

  • pg_collection – Custom process-group collection for reduction.

Returns:

Formatted log string for console output.

clear() None#

Zero out all metric values (entries are kept for reuse).

ensure_initialized(
name: str,
num_layers: int,
device: Optional[Union[str, torch.device, int]] = None,
) None#

Pre-create a metric entry if it does not already exist.

This is needed for PP ranks that have no MoE layers – their tensor size must match ranks that do, otherwise all_reduce across PP hangs.

Parameters:
  • name – Metric name.

  • num_layers – Tensor size (should include MTP layers).

  • device – Device for the zero tensor. Defaults to current CUDA device.

_resolve_names(
track_names: Optional[Union[str, List[str]]],
) List[str]#

Normalize track_names argument to a list of strings.

_sync_metrics(
metric_names: List[str],
pg_collection: Optional[megatron.core.process_groups_config.ProcessGroupCollection] = None,
) None#

All-reduce metrics across distributed ranks.

Reduction order: PP collect → reduce_group sum → avg_group avg → DP avg.

static _count_moe_layers(
num_layers: Optional[int],
moe_layer_freq: Optional[Union[int, List[int]]],
mtp_num_layers: Optional[int],
) int#

Compute the effective number of MoE layers from configuration.

_aggregate(
loss_scale: float,
num_moe_layers: int,
metric_names: List[str],
percentiles: Optional[Dict[str, List[float]]] = None,
) Dict[str, Union[float, torch.Tensor]]#

Aggregate per-layer values into scalar summaries.

Always computes the mean across MoE layers. If percentiles specifies quantiles for a metric, those are computed over non-zero layer values and added as "{name}_p{pct}" keys.

_log_scalars(
scalars: Dict[str, Union[float, torch.Tensor]],
iteration: int,
writer,
wandb_writer,
) None#

Write scalar metrics to TensorBoard and/or W&B.

_log_per_layer(
loss_scale: float,
metric_names: List[str],
iteration: int,
writer,
wandb_writer,
percentiles: Optional[Dict[str, List[float]]] = None,
) None#

Write per-layer metric values to TensorBoard and/or W&B.

static _format(
scalars: Dict[str, Union[float, torch.Tensor]],
) str#

Format aggregated metrics as a console log string.