nemo_automodel.components.distributed.utils#
Module Contents#
Classes#
Context manager to enforce rank0 to process section over other ranks. |
Functions#
Create a Gloo process group for barrier operations. |
|
A timeout wrapper for torch.distributed.barrier() using Gloo backend. |
|
Perform a distributed barrier and then log a message on rank 0. |
|
Reduce loss across all ranks. |
|
Get the synchronization context for the model. |
Data#
API#
- nemo_automodel.components.distributed.utils.logger#
βgetLogger(β¦)β
- nemo_automodel.components.distributed.utils._create_gloo_group()#
Create a Gloo process group for barrier operations.
This allows us to use monitored_barrier with Gloo backend while keeping NCCL for the main training operations.
- Returns:
Gloo process group for barriers
- Return type:
ProcessGroup
- nemo_automodel.components.distributed.utils._barrier_with_timeout(timeout: datetime.timedelta, group=None)#
A timeout wrapper for torch.distributed.barrier() using Gloo backend.
This approach creates a separate Gloo process group for barrier operations while keeping the main NCCL backend for training operations.
- Parameters:
timeout β Maximum time to wait for the barrier
group β Process group for the barrier operation
- Returns:
True if barrier completed successfully, False if timeout occurred
- Return type:
bool
- class nemo_automodel.components.distributed.utils.FirstRankPerNode#
Bases:
contextlib.ContextDecoratorContext manager to enforce rank0 to process section over other ranks.
Lets RANK==0 run the protected code first on each node.
Inserts an extra barrier across only the node-local rank-0 processes.
Works on a single GPU (no env flags, no distributed initialisation).
Note: it is assumed the scoped code is not torch.distributed heavy.
- __enter__(timeout=timedelta(hours=10))#
Create / bootstrap a (distributed) proc. group that rank0 enters first.
- Returns:
True- if the current process is node-rank-0False- otherwise- Return type:
bool
- __exit__(exc_type, exc_val, exc_tb)#
Tear down the context.
If the current process was the first on its node, release the waiting peer ranks by issuing a barrier.
If an exception occurred, abort the entire distributed job.
If this context manager created the process group, destroy it.
- Parameters:
exc_type (Type[BaseException] | None) β Exception class if one occurred inside the
withblock.exc_val (BaseException | None) β The raised exception instance.
exc_tb (TracebackType | None) β Traceback associated with the exception.
- Returns:
Falseso that any exception raised inside thewithblock is propagated to the caller (standard CM semantics).- Return type:
bool
- nemo_automodel.components.distributed.utils.barrier_and_log(string: str) None#
Perform a distributed barrier and then log a message on rank 0.
- Parameters:
string β The message string to log.
- nemo_automodel.components.distributed.utils.reduce_loss(
- loss_store: list[torch.Tensor],
- total_num_tokens: torch.Tensor,
- per_token_loss: bool = True,
- dp_group: Optional[torch.distributed.ProcessGroup] = None,
Reduce loss across all ranks.
- Parameters:
loss_store β List of loss tensors to reduce.
total_num_tokens β Total number of tokens to divide the loss by.
per_token_loss β Whether to divide the loss by the number of tokens.
dp_group β Process group to reduce the loss across.
- Returns:
Tuple of reduced loss and denominator.
- nemo_automodel.components.distributed.utils.get_sync_ctx(model, is_optim_step)#
Get the synchronization context for the model.
- Parameters:
model β The model to synchronize.
is_optim_step β Whether the current step is an optimizer step.
- Returns:
A context manager that synchronizes the model.