nemo_automodel.utils.dist_utils#

Module Contents#

Classes#

FirstRankPerNode

Context manager to enforce rank0 to process section over other ranks.

Functions#

get_rank_safe

Get the distributed rank safely, even if torch.distributed is not initialized.

get_world_size_safe

Get the distributed world size safely, even if torch.distributed is not initialized.

get_local_rank_preinit

Get the local rank from the environment variable, intended for use before full init.

append_to_progress_log

Append a formatted string to the progress log file (rank 0 only).

barrier_and_log

Perform a distributed barrier and then log a message on rank 0.

reduce_loss

Reduce loss across all ranks.

get_sync_ctx

Get the synchronization context for the model.

rescale_gradients

Rescale gradients across the DP group.

clip_gradients

Clip gradients across the DP group.

Data#

API#

nemo_automodel.utils.dist_utils.logger#

‘getLogger(…)’

class nemo_automodel.utils.dist_utils.FirstRankPerNode[source]#

Bases: contextlib.ContextDecorator

Context manager to enforce rank0 to process section over other ranks.

  • Lets LOCAL_RANK==0 run the protected code first on each node.

  • Inserts an extra barrier across only the node‑local rank‑0 processes.

  • Works on a single GPU (no env flags, no distributed initialisation).

Note: it is assumed the scoped code is not torch.distributed heavy.

__enter__()[source]#

Create / bootstrap a (distributed) proc. group that rank0 enters first.

Returns:

True – if the current process is node-rank-0 False – otherwise

Return type:

bool

__exit__(exc_type, exc_val, exc_tb)[source]#

Tear down the context.

  1. If the current process was the first on its node, release the waiting peer ranks by issuing a barrier.

  2. If an exception occurred, abort the entire distributed job.

  3. If this context manager created the process group, destroy it.

Parameters:
  • exc_type (Type[BaseException] | None) – Exception class if one occurred inside the with block.

  • exc_val (BaseException | None) – The raised exception instance.

  • exc_tb (TracebackType | None) – Traceback associated with the exception.

Returns:

False so that any exception raised inside the with block is propagated to the caller (standard CM semantics).

Return type:

bool

_try_bootstrap_pg() bool[source]#

Try to create a default pg from env:// variables.

nemo_automodel.utils.dist_utils.get_rank_safe() int[source]#

Get the distributed rank safely, even if torch.distributed is not initialized.

Returns:

The current process rank.

nemo_automodel.utils.dist_utils.get_world_size_safe() int[source]#

Get the distributed world size safely, even if torch.distributed is not initialized.

Returns:

The total number of processes in the distributed job.

nemo_automodel.utils.dist_utils.get_local_rank_preinit() int[source]#

Get the local rank from the environment variable, intended for use before full init.

Returns:

The local rank of the current process.

nemo_automodel.utils.dist_utils.append_to_progress_log(
save_dir: str,
string: str,
barrier: bool = True,
) None[source]#

Append a formatted string to the progress log file (rank 0 only).

Includes timestamp, job ID, and number of GPUs in the log entry.

Parameters:
  • save_dir – The directory where the ‘progress.txt’ file is located.

  • string – The message string to append.

  • barrier – If True, performs a distributed barrier before writing (rank 0 only).

nemo_automodel.utils.dist_utils.barrier_and_log(string: str) None[source]#

Perform a distributed barrier and then log a message on rank 0.

Parameters:

string – The message string to log.

nemo_automodel.utils.dist_utils.reduce_loss(
loss_store: list[torch.Tensor],
total_num_tokens: torch.Tensor,
per_token_loss: bool = True,
dp_group: Optional[torch.distributed.ProcessGroup] = None,
) tuple[torch.Tensor, torch.Tensor][source]#

Reduce loss across all ranks.

Parameters:
  • loss_store – List of loss tensors to reduce.

  • total_num_tokens – Total number of tokens to divide the loss by.

  • per_token_loss – Whether to divide the loss by the number of tokens.

  • dp_group – Process group to reduce the loss across.

Returns:

Tuple of reduced loss and denominator.

nemo_automodel.utils.dist_utils.get_sync_ctx(model, is_optim_step)[source]#

Get the synchronization context for the model.

Parameters:
  • model – The model to synchronize.

  • is_optim_step – Whether the current step is an optimizer step.

Returns:

A context manager that synchronizes the model.

nemo_automodel.utils.dist_utils.rescale_gradients(model, num_tokens_for_grad_scaling, dp_group=None)[source]#

Rescale gradients across the DP group.

Parameters:
  • model – The model to rescale.

  • num_tokens_for_grad_scaling – The number of tokens to divide the gradients by.

  • dp_group – The process group to rescale the gradients across.

nemo_automodel.utils.dist_utils.clip_gradients(model, clip_norm, foreach=True)[source]#

Clip gradients across the DP group.

Parameters:
  • model – The model to clip the gradients of.

  • clip_norm – The maximum norm of the gradients.

  • foreach – if enabled will use fused operations.