nemo_rl.models.policy.utils#

Module Contents#

Classes#

IPCProtocol

IPC protocol constants for ZMQ weight streaming.

Functions#

apply_top_k_top_p

Apply top-k and top-p masks to the logits.

apply_top_k_only

Apply top-k mask to the logits.

resolve_model_class

Resolve the appropriate model class for a given model name.

is_vllm_v1_engine_enabled

Check if vLLM V1 engine is enabled.

get_gpu_info

Return information about the GPU being used by this worker.

configure_dynamo_cache

Disable dynamo autotune_local_cache.

get_runtime_env_for_policy_worker

Get runtime environment configuration for policy workers.

get_megatron_checkpoint_dir

Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.

get_handle_from_tensor

Get IPC handle from a tensor.

calculate_aligned_size

Calculate aligned size for memory alignment.

stream_weights_via_ipc_zmq_impl

Shared implementation for streaming weights via IPC ZMQ with improved memory management.

rebuild_cuda_tensor_from_ipc

Rebuild a CUDA tensor from an IPC handle.

stream_weights_via_http_impl

Stream weights to SGLang servers via HTTP API (update_weights_from_tensor).

_setup_ipc_gather_group

Setup gather configuration for IPC handlers.

_gather_ipc_handlers

Gather IPC handlers from all ranks in the default FSDP group, then filter by server.

_send_tensor_to_sglang

Send gathered IPC handlers to SGLang server via HTTP.

Data#

API#

nemo_rl.models.policy.utils.AUTOMODEL_FACTORY: Dict[str, Any]#

None

class nemo_rl.models.policy.utils.IPCProtocol(*args, **kwds)#

Bases: enum.Enum

IPC protocol constants for ZMQ weight streaming.

Initialization

COMPLETE#

‘complete’

ACK#

‘ack’

nemo_rl.models.policy.utils.apply_top_k_top_p(
logits: torch.Tensor,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
) torch.Tensor#

Apply top-k and top-p masks to the logits.

Simplified version of VLLM’s implementation for scalar parameters.

Based on VLLM’s implementation: https://github.com/vllm-project/vllm/blob/34a20c49b3f81f64133428b3a0d62309db1256f9/vllm/v1/sample/ops/topk_topp_sampler.py SPDX-License-Identifier: Apache-2.0 Copyright contributors to the vLLM project

Parameters:
  • logits – Input logits tensor of shape [batch_size, seq_len, vocab_size]

  • top_k – Top-k sampling parameter. Set to -1 to consider all tokens.

  • top_p – Top-p (nucleus) sampling parameter. Must be in (0, 1]. Set to 1 to consider all tokens.

Returns:

Filtered logits with sampling parameters applied

nemo_rl.models.policy.utils.apply_top_k_only(logits: torch.Tensor, top_k: int) torch.Tensor#

Apply top-k mask to the logits.

Simplified version of VLLM’s implementation for scalar parameters. This implementation doesn’t involve sorting the entire vocab.

Based on VLLM’s implementation: https://github.com/vllm-project/vllm/blob/34a20c49b3f81f64133428b3a0d62309db1256f9/vllm/v1/sample/ops/topk_topp_sampler.py SPDX-License-Identifier: Apache-2.0 Copyright contributors to the vLLM project

Parameters:
  • logits – Input logits tensor of shape [batch_size, seq_len, vocab_size]

  • top_k – Top-k sampling parameter.

Returns:

Filtered logits with top-k applied

nemo_rl.models.policy.utils.resolve_model_class(model_name: str) Any#

Resolve the appropriate model class for a given model name.

nemo_rl.models.policy.utils.is_vllm_v1_engine_enabled() bool#

Check if vLLM V1 engine is enabled.

Returns:

True if V1 engine is enabled, False otherwise (defaults to True if not set)

Return type:

bool

nemo_rl.models.policy.utils.get_gpu_info(model: torch.nn.Module) dict[str, Any]#

Return information about the GPU being used by this worker.

nemo_rl.models.policy.utils.configure_dynamo_cache() None#

Disable dynamo autotune_local_cache.

Dynamo may fail at cached_autotune when there’s already a cache with different order of node_bundles. Disable autotune_local_cache as a workaround. See https://github.com/pytorch/pytorch/issues/153791 for more details.

nemo_rl.models.policy.utils.get_runtime_env_for_policy_worker(
policy_worker_name: str,
) dict[str, Any]#

Get runtime environment configuration for policy workers.

Note: expandable_segments configuration is handled directly in the worker init methods to ensure proper GPU detection after CUDA initialization.

nemo_rl.models.policy.utils.get_megatron_checkpoint_dir() str#

Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.

Megatron initial checkpoint should be saved to a path available on all nodes. The directory used will take this order of precendence:

  1. $NRL_MEGATRON_CHECKPOINT_DIR (if set)

  2. $HF_HOME/nemo_rl (if HF_HOME is set)

  3. ~/.cache/huggingface/nemo_rl

HF_HOME is preferred since many users will also have that path mounted and it means one less directory to mount into your runtime environment.

nemo_rl.models.policy.utils.get_handle_from_tensor(tensor: torch.Tensor) tuple[Any]#

Get IPC handle from a tensor.

nemo_rl.models.policy.utils.calculate_aligned_size(size_bytes: int, alignment: int = 512) int#

Calculate aligned size for memory alignment.

Parameters:
  • size_bytes (int) – Size in bytes to align

  • alignment (int) – Alignment boundary in bytes (default 512)

Returns:

Aligned size in bytes(int).

nemo_rl.models.policy.utils.stream_weights_via_ipc_zmq_impl(
params_generator,
buffer_size_bytes: int,
zmq_socket,
rank: int,
worker_name: str,
) None#

Shared implementation for streaming weights via IPC ZMQ with improved memory management.

Uses ping-pong double buffering to enable overlapping communication while reusing buffers to reduce memory allocation overhead and improve stability.

Parameters:
  • params_generator – Generator yielding (name, tensor) pairs

  • buffer_size_bytes – total size of buffer in bytes for batching parameters

  • zmq_socket – ZMQ socket for communication

  • rank – Worker rank for logging

  • worker_name – Name of the worker for logging

nemo_rl.models.policy.utils.rebuild_cuda_tensor_from_ipc(
cuda_ipc_handle: tuple,
device_id: int,
) torch.Tensor#

Rebuild a CUDA tensor from an IPC handle.

nemo_rl.models.policy.utils.stream_weights_via_http_impl(
params_generator,
sglang_url_to_gpu_uuids: dict[str, list[str]],
rank: int,
worker_name: str,
current_device_uuid: str,
) None#

Stream weights to SGLang servers via HTTP API (update_weights_from_tensor).

Flow: Each rank creates IPC handler → gather handlers in rank order → send list → SGLang matches by tp_rank index

Key points:

  • Each rank creates handler on its own GPU

  • Handlers are gathered in rank order: [rank0_handler, rank1_handler, …]

  • List index = rank = GPU ID

  • SGLang automatically matches: handler = serialized_handlers[tp_rank]

Parameters:
  • params_generator – Generator yielding (name, tensor) pairs

  • sglang_url_to_gpu_uuids – Dict mapping SGLang server URL to list of GPU UUIDs it uses

  • rank – Worker rank for logging

  • worker_name – Name of the worker for logging

  • current_device_uuid – UUID of the current training worker’s GPU

nemo_rl.models.policy.utils._setup_ipc_gather_group(
rank: int,
current_device_uuid: str,
sglang_gpu_uuids: list[str],
sglang_url_to_gpu_uuids: dict[str, list[str]],
) tuple[Optional[torch.distributed.ProcessGroup], Optional[int], Optional[list[int]]]#

Setup gather configuration for IPC handlers.

Returns:

Tuple of (gather_group, gather_src_rank, matching_ranks)

  • gather_group: None (use default FSDP group)

  • gather_src_rank: The rank that will collect and send to SGLang server

  • matching_ranks: List of ranks that belong to the same SGLang server

nemo_rl.models.policy.utils._gather_ipc_handlers(
serialized_handler: str,
gather_group: Optional[torch.distributed.ProcessGroup],
gather_src: Optional[int],
rank: int,
matching_ranks: Optional[list[int]] = None,
) Optional[list[str]]#

Gather IPC handlers from all ranks in the default FSDP group, then filter by server.

Parameters:
  • serialized_handler – Serialized IPC handler from this rank

  • gather_group – Process group (None means use default FSDP group)

  • gather_src – Rank that will collect and filter handlers

  • rank – Current rank

  • matching_ranks – List of ranks that belong to the same SGLang server

Returns:

List of serialized handlers in rank order (only on gather_src rank), None otherwise The list contains handlers from matching_ranks only, in rank order

nemo_rl.models.policy.utils._send_tensor_to_sglang(
url: str,
tensor_name: str,
gathered_handlers: list[str],
shape: torch.Size,
dtype: str,
flush_cache: bool = False,
) None#

Send gathered IPC handlers to SGLang server via HTTP.

Key: gathered_handlers are in rank order [rank0, rank1, …] SGLang will automatically match: handler = serialized_handlers[tp_rank]

Parameters:
  • url – SGLang server URL

  • tensor_name – Name of the tensor

  • gathered_handlers – List of serialized IPC handlers in rank order

  • shape – Tensor shape

  • dtype – Tensor dtype

  • flush_cache – Whether to flush cache after this tensor (for last tensor)