nemo_rl.models.policy.utils#
Module Contents#
Classes#
IPC protocol constants for ZMQ weight streaming. |
Functions#
Apply top-k and top-p masks to the logits. |
|
Apply top-k mask to the logits. |
|
Resolve the appropriate model class for a given model name. |
|
Check if vLLM V1 engine is enabled. |
|
Return information about the GPU being used by this worker. |
|
Disable dynamo autotune_local_cache. |
|
Get runtime environment configuration for policy workers. |
|
Gets the default megatron checkpoint directory for initial HF -> Mcore conversion. |
|
Get IPC handle from a tensor. |
|
Calculate aligned size for memory alignment. |
|
Shared implementation for streaming weights via IPC ZMQ with improved memory management. |
|
Rebuild a CUDA tensor from an IPC handle. |
Data#
API#
- nemo_rl.models.policy.utils.AUTOMODEL_FACTORY: Dict[str, Any]#
None
- class nemo_rl.models.policy.utils.IPCProtocol(*args, **kwds)#
Bases:
enum.EnumIPC protocol constants for ZMQ weight streaming.
Initialization
- COMPLETE#
‘complete’
- ACK#
‘ack’
- nemo_rl.models.policy.utils.apply_top_k_top_p(
- logits: torch.Tensor,
- top_k: Optional[int] = None,
- top_p: Optional[float] = None,
Apply top-k and top-p masks to the logits.
Simplified version of VLLM’s implementation for scalar parameters.
Based on VLLM’s implementation: https://github.com/vllm-project/vllm/blob/34a20c49b3f81f64133428b3a0d62309db1256f9/vllm/v1/sample/ops/topk_topp_sampler.py SPDX-License-Identifier: Apache-2.0 Copyright contributors to the vLLM project
- Parameters:
logits – Input logits tensor of shape [batch_size, seq_len, vocab_size]
top_k – Top-k sampling parameter. Set to -1 to consider all tokens.
top_p – Top-p (nucleus) sampling parameter. Must be in (0, 1]. Set to 1 to consider all tokens.
- Returns:
Filtered logits with sampling parameters applied
- nemo_rl.models.policy.utils.apply_top_k_only(logits: torch.Tensor, top_k: int) torch.Tensor#
Apply top-k mask to the logits.
Simplified version of VLLM’s implementation for scalar parameters. This implementation doesn’t involve sorting the entire vocab.
Based on VLLM’s implementation: https://github.com/vllm-project/vllm/blob/34a20c49b3f81f64133428b3a0d62309db1256f9/vllm/v1/sample/ops/topk_topp_sampler.py SPDX-License-Identifier: Apache-2.0 Copyright contributors to the vLLM project
- Parameters:
logits – Input logits tensor of shape [batch_size, seq_len, vocab_size]
top_k – Top-k sampling parameter.
- Returns:
Filtered logits with top-k applied
- nemo_rl.models.policy.utils.resolve_model_class(model_name: str) Any#
Resolve the appropriate model class for a given model name.
- nemo_rl.models.policy.utils.is_vllm_v1_engine_enabled() bool#
Check if vLLM V1 engine is enabled.
- Returns:
True if V1 engine is enabled, False otherwise (defaults to True if not set)
- Return type:
bool
- nemo_rl.models.policy.utils.get_gpu_info(model: torch.nn.Module) dict[str, Any]#
Return information about the GPU being used by this worker.
- nemo_rl.models.policy.utils.configure_dynamo_cache() None#
Disable dynamo autotune_local_cache.
Dynamo may fail at cached_autotune when there’s already a cache with different order of node_bundles. Disable autotune_local_cache as a workaround. See https://github.com/pytorch/pytorch/issues/153791 for more details.
- nemo_rl.models.policy.utils.get_runtime_env_for_policy_worker(
- policy_worker_name: str,
Get runtime environment configuration for policy workers.
Note: expandable_segments configuration is handled directly in the worker init methods to ensure proper GPU detection after CUDA initialization.
- nemo_rl.models.policy.utils.get_megatron_checkpoint_dir() str#
Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.
Megatron initial checkpoint should be saved to a path available on all nodes. The directory used will take this order of precendence:
$NRL_MEGATRON_CHECKPOINT_DIR (if set)
$HF_HOME/nemo_rl (if HF_HOME is set)
~/.cache/huggingface/nemo_rl
HF_HOME is preferred since many users will also have that path mounted and it means one less directory to mount into your runtime environment.
- nemo_rl.models.policy.utils.get_handle_from_tensor(tensor: torch.Tensor) tuple[Any]#
Get IPC handle from a tensor.
- nemo_rl.models.policy.utils.calculate_aligned_size(size_bytes: int, alignment: int = 512) int#
Calculate aligned size for memory alignment.
- Parameters:
size_bytes (int) – Size in bytes to align
alignment (int) – Alignment boundary in bytes (default 512)
- Returns:
Aligned size in bytes(int).
- nemo_rl.models.policy.utils.stream_weights_via_ipc_zmq_impl(
- params_generator,
- buffer_size_bytes: int,
- zmq_socket,
- rank: int,
- worker_name: str,
Shared implementation for streaming weights via IPC ZMQ with improved memory management.
Uses ping-pong double buffering to enable overlapping communication while reusing buffers to reduce memory allocation overhead and improve stability.
- Parameters:
params_generator – Generator yielding (name, tensor) pairs
buffer_size_bytes – total size of buffer in bytes for batching parameters
zmq_socket – ZMQ socket for communication
rank – Worker rank for logging
worker_name – Name of the worker for logging
- nemo_rl.models.policy.utils.rebuild_cuda_tensor_from_ipc(
- cuda_ipc_handle: tuple,
- device_id: int,
Rebuild a CUDA tensor from an IPC handle.