> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/automodel/llms.txt.
> For AI client integration (Claude Code, Cursor, etc.), connect to the MCP server at https://docs.nvidia.com/nemo/automodel/_mcp/server.

# nemo_automodel.components.distributed.utils

## Module Contents

### Classes

| Name                                                                                | Description                                                           |
| ----------------------------------------------------------------------------------- | --------------------------------------------------------------------- |
| [`FirstRankPerNode`](#nemo_automodel-components-distributed-utils-FirstRankPerNode) | Context manager to enforce rank0 to process section over other ranks. |

### Functions

| Name                                                                                          | Description                                                                     |
| --------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------- |
| [`_barrier_with_timeout`](#nemo_automodel-components-distributed-utils-_barrier_with_timeout) | A timeout wrapper for torch.distributed.barrier() using Gloo backend.           |
| [`_create_gloo_group`](#nemo_automodel-components-distributed-utils-_create_gloo_group)       | Create a Gloo process group for barrier operations.                             |
| [`barrier_and_log`](#nemo_automodel-components-distributed-utils-barrier_and_log)             | Perform a distributed barrier and then log a message on rank 0.                 |
| [`dp_eval_sample_shard`](#nemo_automodel-components-distributed-utils-dp_eval_sample_shard)   | Return `(dp_rank, dp_size)` to shard eval samples across DP ranks, else `None`. |
| [`get_sync_ctx`](#nemo_automodel-components-distributed-utils-get_sync_ctx)                   | Get the synchronization context for the model.                                  |
| [`reduce_loss`](#nemo_automodel-components-distributed-utils-reduce_loss)                     | Reduce loss across all ranks.                                                   |

### Data

[`logger`](#nemo_automodel-components-distributed-utils-logger)

### API

```python
class nemo_automodel.components.distributed.utils.FirstRankPerNode()
```

**Bases:** `ContextDecorator`

Context manager to enforce rank0 to process section over other ranks.

* Lets RANK==0 run the protected code first on each node.
* Inserts an extra barrier across *only* the node-local rank-0 processes.
* Works on a single GPU (no env flags, no distributed initialisation).

Note: it is assumed the scoped code is not torch.distributed heavy.

```python
nemo_automodel.components.distributed.utils.FirstRankPerNode.__enter__(
    timeout = timedelta(hours=10)
)
```

Create / bootstrap a (distributed) proc. group that rank0 enters first.

**Returns:**

`True`  - if the current process is node-rank-0
`False` - otherwise

```python
nemo_automodel.components.distributed.utils.FirstRankPerNode.__exit__(
    exc_type,
    exc_val,
    exc_tb
)
```

Tear down the context.

1. If the current process was the first on its node, release the
   waiting peer ranks by issuing a barrier.
2. If an exception occurred, abort the *entire* distributed job.
3. If this context manager created the process group, destroy it.

**Parameters:**

Exception class if one
occurred inside the `with` block.

The raised exception instance.

Traceback associated with the
exception.

**Returns:**

`False` so that any exception raised inside the `with`
block is propagated to the caller (standard CM semantics).

```python
nemo_automodel.components.distributed.utils._barrier_with_timeout(
    timeout: datetime.timedelta,
    group = None
)
```

A timeout wrapper for torch.distributed.barrier() using Gloo backend.

This approach creates a separate Gloo process group for barrier operations
while keeping the main NCCL backend for training operations.

**Parameters:**

Maximum time to wait for the barrier

Process group for the barrier operation

**Returns:**

True if barrier completed successfully, False if timeout occurred

```python
nemo_automodel.components.distributed.utils._create_gloo_group()
```

Create a Gloo process group for barrier operations.

This allows us to use monitored\_barrier with Gloo backend while
keeping NCCL for the main training operations.

**Returns:**

Gloo process group for barriers

```python
nemo_automodel.components.distributed.utils.barrier_and_log(
    string: str
) -> None
```

Perform a distributed barrier and then log a message on rank 0.

**Parameters:**

The message string to log.

```python
nemo_automodel.components.distributed.utils.dp_eval_sample_shard(
    distributed_config: typing.Any,
    dp_rank: int,
    dp_size: int
) -> tuple | None
```

Return `(dp_rank, dp_size)` to shard eval samples across DP ranks, else `None`.

Only DDP replicates the full model on every rank, so only there can each rank
score a disjoint subset independently. Under FSDP2 / MegatronFSDP the model is
sharded and `generate()` issues per-layer all-gather collectives that must
stay in lockstep across the group — different samples per rank would desync
them and hang — so every rank scores the same samples (`None`).

```python
nemo_automodel.components.distributed.utils.get_sync_ctx(
    model,
    is_optim_step,
    defer_fsdp_grad_sync: bool
)
```

Get the synchronization context for the model.

**Parameters:**

The model to synchronize.

Whether the current step is an optimizer step.

Controls FSDP2 gradient synchronization during gradient accumulation.

* True: disable gradient sync on non-final micro-batches (saves comm, can increase peak memory).
* False: always sync gradients on every micro-batch (more comm, lower peak memory).

**Returns:**

A context manager that synchronizes the model.

```python
nemo_automodel.components.distributed.utils.reduce_loss(
    loss_store: list[torch.Tensor],
    total_num_tokens: torch.Tensor,
    per_token_loss: bool = True,
    dp_group: typing.Optional[torch.distributed.ProcessGroup] = None
) -> tuple[torch.Tensor, torch.Tensor]
```

Reduce loss across all ranks.

**Parameters:**

List of loss tensors to reduce.

Total number of tokens to divide the loss by.

Whether to divide the loss by the number of tokens.

Process group to reduce the loss across.

**Returns:** `tuple[torch.Tensor, torch.Tensor]`

Tuple of reduced loss and denominator.

```python
nemo_automodel.components.distributed.utils.logger = logging.getLogger(__name__)
```