nemo_rl.models.policy.utils#
Module Contents#
Classes#
IPC protocol constants for ZMQ weight streaming. |
Functions#
Apply top-k and top-p masks to the logits. |
|
Apply top-k mask to the logits. |
|
Resolve the appropriate model class for a given model name. |
|
Check if vLLM V1 engine is enabled. |
|
Return information about the GPU being used by this worker. |
|
Disable dynamo autotune_local_cache. |
|
Get runtime environment configuration for policy workers. |
|
Gets the default megatron checkpoint directory for initial HF -> Mcore conversion. |
|
Get IPC handle from a tensor. |
|
Calculate aligned size for memory alignment. |
|
Shared implementation for streaming weights via IPC ZMQ with improved memory management. |
|
Rebuild a CUDA tensor from an IPC handle. |
|
Stream weights to SGLang servers via HTTP API (update_weights_from_tensor). |
|
Setup gather configuration for IPC handlers. |
|
Gather IPC handlers from all ranks in the default FSDP group, then filter by server. |
|
Send gathered IPC handlers to SGLang server via HTTP. |
Data#
API#
- nemo_rl.models.policy.utils.AUTOMODEL_FACTORY: Dict[str, Any]#
None
- class nemo_rl.models.policy.utils.IPCProtocol(*args, **kwds)#
Bases:
enum.EnumIPC protocol constants for ZMQ weight streaming.
Initialization
- COMPLETE#
‘complete’
- ACK#
‘ack’
- nemo_rl.models.policy.utils.apply_top_k_top_p(
- logits: torch.Tensor,
- top_k: Optional[int] = None,
- top_p: Optional[float] = None,
Apply top-k and top-p masks to the logits.
Simplified version of VLLM’s implementation for scalar parameters.
Based on VLLM’s implementation: https://github.com/vllm-project/vllm/blob/34a20c49b3f81f64133428b3a0d62309db1256f9/vllm/v1/sample/ops/topk_topp_sampler.py SPDX-License-Identifier: Apache-2.0 Copyright contributors to the vLLM project
- Parameters:
logits – Input logits tensor of shape [batch_size, seq_len, vocab_size]
top_k – Top-k sampling parameter. Set to -1 to consider all tokens.
top_p – Top-p (nucleus) sampling parameter. Must be in (0, 1]. Set to 1 to consider all tokens.
- Returns:
Filtered logits with sampling parameters applied
- nemo_rl.models.policy.utils.apply_top_k_only(logits: torch.Tensor, top_k: int) torch.Tensor#
Apply top-k mask to the logits.
Simplified version of VLLM’s implementation for scalar parameters. This implementation doesn’t involve sorting the entire vocab.
Based on VLLM’s implementation: https://github.com/vllm-project/vllm/blob/34a20c49b3f81f64133428b3a0d62309db1256f9/vllm/v1/sample/ops/topk_topp_sampler.py SPDX-License-Identifier: Apache-2.0 Copyright contributors to the vLLM project
- Parameters:
logits – Input logits tensor of shape [batch_size, seq_len, vocab_size]
top_k – Top-k sampling parameter.
- Returns:
Filtered logits with top-k applied
- nemo_rl.models.policy.utils.resolve_model_class(model_name: str) Any#
Resolve the appropriate model class for a given model name.
- nemo_rl.models.policy.utils.is_vllm_v1_engine_enabled() bool#
Check if vLLM V1 engine is enabled.
- Returns:
True if V1 engine is enabled, False otherwise (defaults to True if not set)
- Return type:
bool
- nemo_rl.models.policy.utils.get_gpu_info(model: torch.nn.Module) dict[str, Any]#
Return information about the GPU being used by this worker.
- nemo_rl.models.policy.utils.configure_dynamo_cache() None#
Disable dynamo autotune_local_cache.
Dynamo may fail at cached_autotune when there’s already a cache with different order of node_bundles. Disable autotune_local_cache as a workaround. See https://github.com/pytorch/pytorch/issues/153791 for more details.
- nemo_rl.models.policy.utils.get_runtime_env_for_policy_worker(
- policy_worker_name: str,
Get runtime environment configuration for policy workers.
Note: expandable_segments configuration is handled directly in the worker init methods to ensure proper GPU detection after CUDA initialization.
- nemo_rl.models.policy.utils.get_megatron_checkpoint_dir() str#
Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.
Megatron initial checkpoint should be saved to a path available on all nodes. The directory used will take this order of precendence:
$NRL_MEGATRON_CHECKPOINT_DIR (if set)
$HF_HOME/nemo_rl (if HF_HOME is set)
~/.cache/huggingface/nemo_rl
HF_HOME is preferred since many users will also have that path mounted and it means one less directory to mount into your runtime environment.
- nemo_rl.models.policy.utils.get_handle_from_tensor(tensor: torch.Tensor) tuple[Any]#
Get IPC handle from a tensor.
- nemo_rl.models.policy.utils.calculate_aligned_size(size_bytes: int, alignment: int = 512) int#
Calculate aligned size for memory alignment.
- Parameters:
size_bytes (int) – Size in bytes to align
alignment (int) – Alignment boundary in bytes (default 512)
- Returns:
Aligned size in bytes(int).
- nemo_rl.models.policy.utils.stream_weights_via_ipc_zmq_impl(
- params_generator,
- buffer_size_bytes: int,
- zmq_socket,
- rank: int,
- worker_name: str,
Shared implementation for streaming weights via IPC ZMQ with improved memory management.
Uses ping-pong double buffering to enable overlapping communication while reusing buffers to reduce memory allocation overhead and improve stability.
- Parameters:
params_generator – Generator yielding (name, tensor) pairs
buffer_size_bytes – total size of buffer in bytes for batching parameters
zmq_socket – ZMQ socket for communication
rank – Worker rank for logging
worker_name – Name of the worker for logging
- nemo_rl.models.policy.utils.rebuild_cuda_tensor_from_ipc(
- cuda_ipc_handle: tuple,
- device_id: int,
Rebuild a CUDA tensor from an IPC handle.
- nemo_rl.models.policy.utils.stream_weights_via_http_impl(
- params_generator,
- sglang_url_to_gpu_uuids: dict[str, list[str]],
- rank: int,
- worker_name: str,
- current_device_uuid: str,
Stream weights to SGLang servers via HTTP API (update_weights_from_tensor).
Flow: Each rank creates IPC handler → gather handlers in rank order → send list → SGLang matches by tp_rank index
Key points:
Each rank creates handler on its own GPU
Handlers are gathered in rank order: [rank0_handler, rank1_handler, …]
List index = rank = GPU ID
SGLang automatically matches: handler = serialized_handlers[tp_rank]
- Parameters:
params_generator – Generator yielding (name, tensor) pairs
sglang_url_to_gpu_uuids – Dict mapping SGLang server URL to list of GPU UUIDs it uses
rank – Worker rank for logging
worker_name – Name of the worker for logging
current_device_uuid – UUID of the current training worker’s GPU
- nemo_rl.models.policy.utils._setup_ipc_gather_group(
- rank: int,
- current_device_uuid: str,
- sglang_gpu_uuids: list[str],
- sglang_url_to_gpu_uuids: dict[str, list[str]],
Setup gather configuration for IPC handlers.
- Returns:
Tuple of (gather_group, gather_src_rank, matching_ranks)
gather_group: None (use default FSDP group)
gather_src_rank: The rank that will collect and send to SGLang server
matching_ranks: List of ranks that belong to the same SGLang server
- nemo_rl.models.policy.utils._gather_ipc_handlers(
- serialized_handler: str,
- gather_group: Optional[torch.distributed.ProcessGroup],
- gather_src: Optional[int],
- rank: int,
- matching_ranks: Optional[list[int]] = None,
Gather IPC handlers from all ranks in the default FSDP group, then filter by server.
- Parameters:
serialized_handler – Serialized IPC handler from this rank
gather_group – Process group (None means use default FSDP group)
gather_src – Rank that will collect and filter handlers
rank – Current rank
matching_ranks – List of ranks that belong to the same SGLang server
- Returns:
List of serialized handlers in rank order (only on gather_src rank), None otherwise The list contains handlers from matching_ranks only, in rank order
- nemo_rl.models.policy.utils._send_tensor_to_sglang(
- url: str,
- tensor_name: str,
- gathered_handlers: list[str],
- shape: torch.Size,
- dtype: str,
- flush_cache: bool = False,
Send gathered IPC handlers to SGLang server via HTTP.
Key: gathered_handlers are in rank order [rank0, rank1, …] SGLang will automatically match: handler = serialized_handlers[tp_rank]
- Parameters:
url – SGLang server URL
tensor_name – Name of the tensor
gathered_handlers – List of serialized IPC handlers in rank order
shape – Tensor shape
dtype – Tensor dtype
flush_cache – Whether to flush cache after this tensor (for last tensor)