nemo_rl.models.policy.utils#

Module Contents#

Classes#

IPCProtocol

IPC protocol constants for ZMQ weight streaming.

Functions#

apply_top_k_top_p

Apply top-k and top-p masks to the logits.

apply_top_k_only

Apply top-k mask to the logits.

resolve_model_class

Resolve the appropriate model class for a given model name.

is_vllm_v1_engine_enabled

Check if vLLM V1 engine is enabled.

get_gpu_info

Return information about the GPU being used by this worker.

configure_dynamo_cache

Disable dynamo autotune_local_cache.

get_runtime_env_for_policy_worker

Get runtime environment configuration for policy workers.

get_megatron_checkpoint_dir

Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.

get_handle_from_tensor

Get IPC handle from a tensor.

calculate_aligned_size

Calculate aligned size for memory alignment.

stream_weights_via_ipc_zmq_impl

Shared implementation for streaming weights via IPC ZMQ with improved memory management.

rebuild_cuda_tensor_from_ipc

Rebuild a CUDA tensor from an IPC handle.

Data#

API#

nemo_rl.models.policy.utils.AUTOMODEL_FACTORY: Dict[str, Any]#

None

class nemo_rl.models.policy.utils.IPCProtocol(*args, **kwds)#

Bases: enum.Enum

IPC protocol constants for ZMQ weight streaming.

Initialization

COMPLETE#

‘complete’

ACK#

‘ack’

nemo_rl.models.policy.utils.apply_top_k_top_p(
logits: torch.Tensor,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
) torch.Tensor#

Apply top-k and top-p masks to the logits.

Simplified version of VLLM’s implementation for scalar parameters.

Based on VLLM’s implementation: https://github.com/vllm-project/vllm/blob/34a20c49b3f81f64133428b3a0d62309db1256f9/vllm/v1/sample/ops/topk_topp_sampler.py SPDX-License-Identifier: Apache-2.0 Copyright contributors to the vLLM project

Parameters:
  • logits – Input logits tensor of shape [batch_size, seq_len, vocab_size]

  • top_k – Top-k sampling parameter. Set to -1 to consider all tokens.

  • top_p – Top-p (nucleus) sampling parameter. Must be in (0, 1]. Set to 1 to consider all tokens.

Returns:

Filtered logits with sampling parameters applied

nemo_rl.models.policy.utils.apply_top_k_only(logits: torch.Tensor, top_k: int) torch.Tensor#

Apply top-k mask to the logits.

Simplified version of VLLM’s implementation for scalar parameters. This implementation doesn’t involve sorting the entire vocab.

Based on VLLM’s implementation: https://github.com/vllm-project/vllm/blob/34a20c49b3f81f64133428b3a0d62309db1256f9/vllm/v1/sample/ops/topk_topp_sampler.py SPDX-License-Identifier: Apache-2.0 Copyright contributors to the vLLM project

Parameters:
  • logits – Input logits tensor of shape [batch_size, seq_len, vocab_size]

  • top_k – Top-k sampling parameter.

Returns:

Filtered logits with top-k applied

nemo_rl.models.policy.utils.resolve_model_class(model_name: str) Any#

Resolve the appropriate model class for a given model name.

nemo_rl.models.policy.utils.is_vllm_v1_engine_enabled() bool#

Check if vLLM V1 engine is enabled.

Returns:

True if V1 engine is enabled, False otherwise (defaults to True if not set)

Return type:

bool

nemo_rl.models.policy.utils.get_gpu_info(model: torch.nn.Module) dict[str, Any]#

Return information about the GPU being used by this worker.

nemo_rl.models.policy.utils.configure_dynamo_cache() None#

Disable dynamo autotune_local_cache.

Dynamo may fail at cached_autotune when there’s already a cache with different order of node_bundles. Disable autotune_local_cache as a workaround. See https://github.com/pytorch/pytorch/issues/153791 for more details.

nemo_rl.models.policy.utils.get_runtime_env_for_policy_worker(
policy_worker_name: str,
) dict[str, Any]#

Get runtime environment configuration for policy workers.

Note: expandable_segments configuration is handled directly in the worker init methods to ensure proper GPU detection after CUDA initialization.

nemo_rl.models.policy.utils.get_megatron_checkpoint_dir() str#

Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.

Megatron initial checkpoint should be saved to a path available on all nodes. The directory used will take this order of precendence:

  1. $NRL_MEGATRON_CHECKPOINT_DIR (if set)

  2. $HF_HOME/nemo_rl (if HF_HOME is set)

  3. ~/.cache/huggingface/nemo_rl

HF_HOME is preferred since many users will also have that path mounted and it means one less directory to mount into your runtime environment.

nemo_rl.models.policy.utils.get_handle_from_tensor(tensor: torch.Tensor) tuple[Any]#

Get IPC handle from a tensor.

nemo_rl.models.policy.utils.calculate_aligned_size(size_bytes: int, alignment: int = 512) int#

Calculate aligned size for memory alignment.

Parameters:
  • size_bytes (int) – Size in bytes to align

  • alignment (int) – Alignment boundary in bytes (default 512)

Returns:

Aligned size in bytes(int).

nemo_rl.models.policy.utils.stream_weights_via_ipc_zmq_impl(
params_generator,
buffer_size_bytes: int,
zmq_socket,
rank: int,
worker_name: str,
) None#

Shared implementation for streaming weights via IPC ZMQ with improved memory management.

Uses ping-pong double buffering to enable overlapping communication while reusing buffers to reduce memory allocation overhead and improve stability.

Parameters:
  • params_generator – Generator yielding (name, tensor) pairs

  • buffer_size_bytes – total size of buffer in bytes for batching parameters

  • zmq_socket – ZMQ socket for communication

  • rank – Worker rank for logging

  • worker_name – Name of the worker for logging

nemo_rl.models.policy.utils.rebuild_cuda_tensor_from_ipc(
cuda_ipc_handle: tuple,
device_id: int,
) torch.Tensor#

Rebuild a CUDA tensor from an IPC handle.