nemo_rl.models.policy.workers.dtensor_policy_worker_v2#

Module Contents#

Classes#

Functions#

dtensor_params_generator

Generator that yields (name, tensor) pairs, converting DTensors to local tensors and adapting to HF format.

_maybe_adapt_tensor_to_hf

get_train_context

Create combined context manager for training with context parallel and autocast.

API#

nemo_rl.models.policy.workers.dtensor_policy_worker_v2.dtensor_params_generator(
model: torch.nn.Module,
target_dtype: torch.dtype,
) Generator[tuple[str, torch.Tensor], None, None]#

Generator that yields (name, tensor) pairs, converting DTensors to local tensors and adapting to HF format.

Parameters:
  • model – The model whose parameters to generate.

  • target_dtype – The dtype to convert tensors to.

Yields:

Tuples of (fully_qualified_name, tensor) where tensors are converted to target dtype and made contiguous.

nemo_rl.models.policy.workers.dtensor_policy_worker_v2._maybe_adapt_tensor_to_hf(
model_part: torch.nn.Module,
fqn: str,
tensor: torch.Tensor,
quantization: bool = False,
) list[tuple[str, torch.Tensor]]#
nemo_rl.models.policy.workers.dtensor_policy_worker_v2.get_train_context(
cp_size: int,
cp_mesh: Any,
cp_buffers: list,
sequence_dim: int,
dtype: torch.dtype,
autocast_enabled: bool = True,
) Generator[None, None, None]#

Create combined context manager for training with context parallel and autocast.

class nemo_rl.models.policy.workers.dtensor_policy_worker_v2.DTensorPolicyWorkerV2(
config: nemo_rl.models.policy.PolicyConfig,
tokenizer: transformers.AutoTokenizer,
processor: Optional[transformers.AutoProcessor] = None,
weights_path: Optional[str] = None,
optimizer_path: Optional[str] = None,
init_optimizer: bool = True,
init_reference_model: bool = True,
**kwargs: Any,
)#

Bases: nemo_rl.models.policy.workers.base_policy_worker.AbstractPolicyWorker, nemo_rl.models.policy.interfaces.ColocatablePolicyInterface

__repr__() str#

Customizes the actor’s prefix in the Ray logs.

This makes it easier to identify which worker is producing specific log messages.

_apply_temperature_scaling(logits: torch.Tensor) torch.Tensor#
train(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
loss_fn: nemo_rl.algorithms.interfaces.LossFunction,
eval_mode: bool = False,
gbs: Optional[int] = None,
mbs: Optional[int] = None,
) dict[str, Any]#

Train the policy on a batch of data with a given loss function.

get_logprobs(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
micro_batch_size: Optional[int] = None,
) nemo_rl.distributed.batched_data_dict.BatchedDataDict[nemo_rl.models.policy.interfaces.LogprobOutputSpec]#

Get the logprobs of the model for a batch of data.

Uses the configured logprob_batch_size to do microbatching.

Input data is assumed to be right-padded. The method internally converts to left-padded format for computation, and returns outputs in right-padded format.

Returns:

a BatchedDataDict with key “logprobs” and shape [batch_size, sequence_length]. We use the convention that the logprob of the first token is 0 so that the sequence length is maintained. The logprob of input token i is specified at position i in the output logprobs tensor.

score(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict,
) nemo_rl.distributed.batched_data_dict.BatchedDataDict[nemo_rl.models.policy.interfaces.ScoreOutputSpec]#
get_topk_logits(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
k: int,
micro_batch_size: Optional[int] = None,
) nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any]#

Return per-position top-k logits and corresponding global indices.

Notes:

  • Return shapes are [B, S, k].

  • Computes top-k over the full sequence (no trimming of the last position).

  • If alignment with next-token targets is required, the caller should handle it.

  • If logits are TP-sharded DTensor, performs distributed global top-k across TP.

  • Supports context parallelism with proper CP gather.

  • Otherwise, computes local top-k on full-vocab tensor.

use_reference_model() Generator[None, None, None]#

Context manager that temporarily swaps the reference model and active model.

On entry: Moves model to CPU, moves reference_model to CUDA. Swaps the references On exit: Restores original references and re-flips cuda/cpu

_add_noise_to_weights() None#

Add small Gaussian noise to the weights of the model. Note that this is used for testing purposes only.

return_state_dict()#
return_model_config() dict[str, Any]#

Return the model configuration as a dictionary.

Returns:

Model configuration dictionary

Return type:

dict

prepare_refit_info() Optional[dict[str, Any]]#

Prepare state dict metadata for weight refitting and IPC streaming.

abstractmethod calibrate_qkv_fp8_scales(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
micro_batch_size: Optional[int] = None,
percentile: float = 99.9,
margin: float = 1.05,
include_q: bool = False,
) dict[str, Any]#

Placeholder for FP8 Q/K/V scale calibration, not implemented for DTensorPolicyWorkerV2.

stream_weights_via_ipc_zmq(
buffer_size_bytes: int = 0,
kv_scales: Optional[dict[str, float]] = None,
) None#

Stream model weights to peer process via ZMQ IPC socket.

stream_weights_via_http(
sglang_url_to_gpu_uuids: dict[str, list[str]],
) None#

Stream model weights to SGLang servers via HTTP API.

Parameters:

sglang_url_to_gpu_uuids – Dict mapping SGLang server URL to list of GPU UUIDs it uses

broadcast_weights_for_collective(
kv_scales: Optional[dict[str, float]] = None,
) None#

Broadcast the weights for collective communication.

prepare_for_lp_inference() None#
prepare_for_training(*args, **kwargs) None#
offload_before_refit() None#

Offload the optimizer to the CPU.

offload_after_refit() None#

Offload as much as possible on the CPU.

move_optimizer_to_device(device: str | torch.device) None#
move_to_device(
model: torch.nn.Module,
device: str | torch.device,
) torch.nn.Module#
move_buffer_to_device(
model: torch.nn.Module,
device: str | torch.device,
) torch.nn.Module#
move_to_cuda(model: torch.nn.Module) torch.nn.Module#
move_to_cpu(model: torch.nn.Module) torch.nn.Module#
save_checkpoint(
weights_path: str,
optimizer_path: Optional[str] = None,
tokenizer_path: Optional[str] = None,
checkpointing_cfg: Optional[nemo_rl.utils.checkpoint.CheckpointingConfig] = None,
) None#

Save a checkpoint of the model.

the optimizer states are saved only if optimizer and optimizer_path are provided.

load_checkpoint(
weights_path: str,
optimizer_path: Optional[str] = None,
) None#

Load a checkpoint into the model using Automodel Checkpointer.

_init_checkpoint_manager(
config_updates: Optional[dict[str, Any]] = None,
checkpoint_root: Optional[str] = None,
) None#

Initialize the AutomodelCheckpointManager for this worker.

This creates the checkpoint manager bound to this worker’s device meshes and initializes its underlying checkpointer.

Parameters:
  • config_updates – Dict of CheckpointingConfig fields to set during initialization.

  • checkpoint_root – Optional root directory for checkpoints.