nemo_automodel.components.distributed.utils
#
Module Contents#
Classes#
Context manager to enforce rank0 to process section over other ranks. |
Functions#
Create a Gloo process group for barrier operations. |
|
A timeout wrapper for torch.distributed.barrier() using Gloo backend. |
|
Perform a distributed barrier and then log a message on rank 0. |
|
Reduce loss across all ranks. |
|
Get the synchronization context for the model. |
|
Rescale gradients across the DP group. |
|
Clip gradients across the DP group. |
Data#
API#
- nemo_automodel.components.distributed.utils.logger#
βgetLogger(β¦)β
- nemo_automodel.components.distributed.utils._create_gloo_group()#
Create a Gloo process group for barrier operations.
This allows us to use monitored_barrier with Gloo backend while keeping NCCL for the main training operations.
- Returns:
Gloo process group for barriers
- Return type:
ProcessGroup
- nemo_automodel.components.distributed.utils._barrier_with_timeout(timeout: datetime.timedelta, group=None)#
A timeout wrapper for torch.distributed.barrier() using Gloo backend.
This approach creates a separate Gloo process group for barrier operations while keeping the main NCCL backend for training operations.
- Parameters:
timeout β Maximum time to wait for the barrier
group β Process group for the barrier operation
- Returns:
True if barrier completed successfully, False if timeout occurred
- Return type:
bool
- class nemo_automodel.components.distributed.utils.FirstRankPerNode#
Bases:
contextlib.ContextDecorator
Context manager to enforce rank0 to process section over other ranks.
Lets RANK==0 run the protected code first on each node.
Inserts an extra barrier across only the node-local rank-0 processes.
Works on a single GPU (no env flags, no distributed initialisation).
Note: it is assumed the scoped code is not torch.distributed heavy.
- __enter__(timeout=timedelta(hours=10))#
Create / bootstrap a (distributed) proc. group that rank0 enters first.
- Returns:
True
- if the current process is node-rank-0False
- otherwise- Return type:
bool
- __exit__(exc_type, exc_val, exc_tb)#
Tear down the context.
If the current process was the first on its node, release the waiting peer ranks by issuing a barrier.
If an exception occurred, abort the entire distributed job.
If this context manager created the process group, destroy it.
- Parameters:
exc_type (Type[BaseException] | None) β Exception class if one occurred inside the
with
block.exc_val (BaseException | None) β The raised exception instance.
exc_tb (TracebackType | None) β Traceback associated with the exception.
- Returns:
False
so that any exception raised inside thewith
block is propagated to the caller (standard CM semantics).- Return type:
bool
- nemo_automodel.components.distributed.utils.barrier_and_log(string: str) None #
Perform a distributed barrier and then log a message on rank 0.
- Parameters:
string β The message string to log.
- nemo_automodel.components.distributed.utils.reduce_loss(
- loss_store: list[torch.Tensor],
- total_num_tokens: torch.Tensor,
- per_token_loss: bool = True,
- dp_group: Optional[torch.distributed.ProcessGroup] = None,
Reduce loss across all ranks.
- Parameters:
loss_store β List of loss tensors to reduce.
total_num_tokens β Total number of tokens to divide the loss by.
per_token_loss β Whether to divide the loss by the number of tokens.
dp_group β Process group to reduce the loss across.
- Returns:
Tuple of reduced loss and denominator.
- nemo_automodel.components.distributed.utils.get_sync_ctx(model, is_optim_step)#
Get the synchronization context for the model.
- Parameters:
model β The model to synchronize.
is_optim_step β Whether the current step is an optimizer step.
- Returns:
A context manager that synchronizes the model.
- nemo_automodel.components.distributed.utils.rescale_gradients(model, num_tokens_for_grad_scaling, dp_group=None)#
Rescale gradients across the DP group.
- Parameters:
model β The model to rescale.
num_tokens_for_grad_scaling β The number of tokens to divide the gradients by.
dp_group β The process group to rescale the gradients across.
- nemo_automodel.components.distributed.utils.clip_gradients(model, clip_norm, foreach=True)#
Clip gradients across the DP group.
- Parameters:
model β The model to clip the gradients of.
clip_norm β The maximum norm of the gradients.
foreach β if enabled will use fused operations.