nemo_rl.models.policy.workers.dtensor_policy_worker_v2#
Module Contents#
Classes#
Functions#
Generator that yields (name, tensor) pairs, converting DTensors to local tensors and adapting to HF format. |
|
Create combined context manager for training with context parallel and autocast. |
API#
- nemo_rl.models.policy.workers.dtensor_policy_worker_v2.dtensor_params_generator(
- model: torch.nn.Module,
- target_dtype: torch.dtype,
Generator that yields (name, tensor) pairs, converting DTensors to local tensors and adapting to HF format.
- Parameters:
model – The model whose parameters to generate.
target_dtype – The dtype to convert tensors to.
- Yields:
Tuples of (fully_qualified_name, tensor) where tensors are converted to target dtype and made contiguous.
- nemo_rl.models.policy.workers.dtensor_policy_worker_v2._maybe_adapt_tensor_to_hf(
- model_part: torch.nn.Module,
- fqn: str,
- tensor: torch.Tensor,
- quantization: bool = False,
- nemo_rl.models.policy.workers.dtensor_policy_worker_v2.get_train_context(
- cp_size: int,
- cp_mesh: Any,
- cp_buffers: list,
- sequence_dim: int,
- dtype: torch.dtype,
- autocast_enabled: bool = True,
Create combined context manager for training with context parallel and autocast.
- class nemo_rl.models.policy.workers.dtensor_policy_worker_v2.DTensorPolicyWorkerV2(
- config: nemo_rl.models.policy.PolicyConfig,
- tokenizer: transformers.AutoTokenizer,
- processor: Optional[transformers.AutoProcessor] = None,
- weights_path: Optional[str] = None,
- optimizer_path: Optional[str] = None,
- init_optimizer: bool = True,
- init_reference_model: bool = True,
- **kwargs: Any,
Bases:
nemo_rl.models.policy.workers.base_policy_worker.AbstractPolicyWorker,nemo_rl.models.policy.interfaces.ColocatablePolicyInterface- __repr__() str#
Customizes the actor’s prefix in the Ray logs.
This makes it easier to identify which worker is producing specific log messages.
- _apply_temperature_scaling(logits: torch.Tensor) torch.Tensor#
- train(
- data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
- loss_fn: nemo_rl.algorithms.interfaces.LossFunction,
- eval_mode: bool = False,
- gbs: Optional[int] = None,
- mbs: Optional[int] = None,
Train the policy on a batch of data with a given loss function.
- get_logprobs(
- data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
- micro_batch_size: Optional[int] = None,
Get the logprobs of the model for a batch of data.
Uses the configured logprob_batch_size to do microbatching.
Input data is assumed to be right-padded. The method internally converts to left-padded format for computation, and returns outputs in right-padded format.
- Returns:
a BatchedDataDict with key “logprobs” and shape [batch_size, sequence_length]. We use the convention that the logprob of the first token is 0 so that the sequence length is maintained. The logprob of input token i is specified at position i in the output logprobs tensor.
- score( ) nemo_rl.distributed.batched_data_dict.BatchedDataDict[nemo_rl.models.policy.interfaces.ScoreOutputSpec]#
- get_topk_logits(
- data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
- k: int,
- micro_batch_size: Optional[int] = None,
Return per-position top-k logits and corresponding global indices.
Notes:
Return shapes are [B, S, k].
Computes top-k over the full sequence (no trimming of the last position).
If alignment with next-token targets is required, the caller should handle it.
If logits are TP-sharded DTensor, performs distributed global top-k across TP.
Supports context parallelism with proper CP gather.
Otherwise, computes local top-k on full-vocab tensor.
- use_reference_model() Generator[None, None, None]#
Context manager that temporarily swaps the reference model and active model.
On entry: Moves model to CPU, moves reference_model to CUDA. Swaps the references On exit: Restores original references and re-flips cuda/cpu
- _add_noise_to_weights() None#
Add small Gaussian noise to the weights of the model. Note that this is used for testing purposes only.
- return_state_dict()#
- return_model_config() dict[str, Any]#
Return the model configuration as a dictionary.
- Returns:
Model configuration dictionary
- Return type:
dict
- prepare_refit_info() Optional[dict[str, Any]]#
Prepare state dict metadata for weight refitting and IPC streaming.
- abstractmethod calibrate_qkv_fp8_scales(
- data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
- micro_batch_size: Optional[int] = None,
- percentile: float = 99.9,
- margin: float = 1.05,
- include_q: bool = False,
Placeholder for FP8 Q/K/V scale calibration, not implemented for DTensorPolicyWorkerV2.
- stream_weights_via_ipc_zmq(
- buffer_size_bytes: int = 0,
- kv_scales: Optional[dict[str, float]] = None,
Stream model weights to peer process via ZMQ IPC socket.
- stream_weights_via_http(
- sglang_url_to_gpu_uuids: dict[str, list[str]],
Stream model weights to SGLang servers via HTTP API.
- Parameters:
sglang_url_to_gpu_uuids – Dict mapping SGLang server URL to list of GPU UUIDs it uses
- broadcast_weights_for_collective(
- kv_scales: Optional[dict[str, float]] = None,
Broadcast the weights for collective communication.
- prepare_for_lp_inference() None#
- prepare_for_training(*args, **kwargs) None#
- offload_before_refit() None#
Offload the optimizer to the CPU.
- offload_after_refit() None#
Offload as much as possible on the CPU.
- move_optimizer_to_device(device: str | torch.device) None#
- move_to_device(
- model: torch.nn.Module,
- device: str | torch.device,
- move_buffer_to_device(
- model: torch.nn.Module,
- device: str | torch.device,
- move_to_cuda(model: torch.nn.Module) torch.nn.Module#
- move_to_cpu(model: torch.nn.Module) torch.nn.Module#
- save_checkpoint(
- weights_path: str,
- optimizer_path: Optional[str] = None,
- tokenizer_path: Optional[str] = None,
- checkpointing_cfg: Optional[nemo_rl.utils.checkpoint.CheckpointingConfig] = None,
Save a checkpoint of the model.
the optimizer states are saved only if
optimizerandoptimizer_pathare provided.
- load_checkpoint(
- weights_path: str,
- optimizer_path: Optional[str] = None,
Load a checkpoint into the model using Automodel Checkpointer.
- _init_checkpoint_manager(
- config_updates: Optional[dict[str, Any]] = None,
- checkpoint_root: Optional[str] = None,
Initialize the AutomodelCheckpointManager for this worker.
This creates the checkpoint manager bound to this worker’s device meshes and initializes its underlying checkpointer.
- Parameters:
config_updates – Dict of CheckpointingConfig fields to set during initialization.
checkpoint_root – Optional root directory for checkpoints.