nemo_automodel.components.distributed.utils

View as Markdown

Module Contents

Classes

NameDescription
FirstRankPerNodeContext manager to enforce rank0 to process section over other ranks.

Functions

NameDescription
_barrier_with_timeoutA timeout wrapper for torch.distributed.barrier() using Gloo backend.
_create_gloo_groupCreate a Gloo process group for barrier operations.
barrier_and_logPerform a distributed barrier and then log a message on rank 0.
dp_eval_sample_shardReturn (dp_rank, dp_size) to shard eval samples across DP ranks, else None.
get_sync_ctxGet the synchronization context for the model.
reduce_lossReduce loss across all ranks.

Data

logger

API

class nemo_automodel.components.distributed.utils.FirstRankPerNode()

Bases: ContextDecorator

Context manager to enforce rank0 to process section over other ranks.

  • Lets RANK==0 run the protected code first on each node.
  • Inserts an extra barrier across only the node-local rank-0 processes.
  • Works on a single GPU (no env flags, no distributed initialisation).

Note: it is assumed the scoped code is not torch.distributed heavy.

nemo_automodel.components.distributed.utils.FirstRankPerNode.__enter__(
timeout = timedelta(hours=10)
)

Create / bootstrap a (distributed) proc. group that rank0 enters first.

Returns:

True - if the current process is node-rank-0 False - otherwise

nemo_automodel.components.distributed.utils.FirstRankPerNode.__exit__(
exc_type,
exc_val,
exc_tb
)

Tear down the context.

  1. If the current process was the first on its node, release the waiting peer ranks by issuing a barrier.
  2. If an exception occurred, abort the entire distributed job.
  3. If this context manager created the process group, destroy it.

Parameters:

exc_type
Type[BaseException] | None

Exception class if one occurred inside the with block.

exc_val
BaseException | None

The raised exception instance.

exc_tb
TracebackType | None

Traceback associated with the exception.

Returns:

False so that any exception raised inside the with block is propagated to the caller (standard CM semantics).

nemo_automodel.components.distributed.utils._barrier_with_timeout(
timeout: datetime.timedelta,
group = None
)

A timeout wrapper for torch.distributed.barrier() using Gloo backend.

This approach creates a separate Gloo process group for barrier operations while keeping the main NCCL backend for training operations.

Parameters:

timeout
timedelta

Maximum time to wait for the barrier

group
Defaults to None

Process group for the barrier operation

Returns:

True if barrier completed successfully, False if timeout occurred

nemo_automodel.components.distributed.utils._create_gloo_group()

Create a Gloo process group for barrier operations.

This allows us to use monitored_barrier with Gloo backend while keeping NCCL for the main training operations.

Returns:

Gloo process group for barriers

nemo_automodel.components.distributed.utils.barrier_and_log(
string: str
) -> None

Perform a distributed barrier and then log a message on rank 0.

Parameters:

string
str

The message string to log.

nemo_automodel.components.distributed.utils.dp_eval_sample_shard(
distributed_config: typing.Any,
dp_rank: int,
dp_size: int
) -> tuple | None

Return (dp_rank, dp_size) to shard eval samples across DP ranks, else None.

Only DDP replicates the full model on every rank, so only there can each rank score a disjoint subset independently. Under FSDP2 / MegatronFSDP the model is sharded and generate() issues per-layer all-gather collectives that must stay in lockstep across the group — different samples per rank would desync them and hang — so every rank scores the same samples (None).

nemo_automodel.components.distributed.utils.get_sync_ctx(
model,
is_optim_step,
defer_fsdp_grad_sync: bool
)

Get the synchronization context for the model.

Parameters:

model

The model to synchronize.

is_optim_step

Whether the current step is an optimizer step.

defer_fsdp_grad_sync
bool

Controls FSDP2 gradient synchronization during gradient accumulation.

  • True: disable gradient sync on non-final micro-batches (saves comm, can increase peak memory).
  • False: always sync gradients on every micro-batch (more comm, lower peak memory).

Returns:

A context manager that synchronizes the model.

nemo_automodel.components.distributed.utils.reduce_loss(
loss_store: list[torch.Tensor],
total_num_tokens: torch.Tensor,
per_token_loss: bool = True,
dp_group: typing.Optional[torch.distributed.ProcessGroup] = None
) -> tuple[torch.Tensor, torch.Tensor]

Reduce loss across all ranks.

Parameters:

loss_store
list[torch.Tensor]

List of loss tensors to reduce.

total_num_tokens
torch.Tensor

Total number of tokens to divide the loss by.

per_token_loss
boolDefaults to True

Whether to divide the loss by the number of tokens.

dp_group
Optional[torch.distributed.ProcessGroup]Defaults to None

Process group to reduce the loss across.

Returns: tuple[torch.Tensor, torch.Tensor]

Tuple of reduced loss and denominator.

nemo_automodel.components.distributed.utils.logger = logging.getLogger(__name__)