nemo_rl.models.policy.utils#

Module Contents#

Classes#

IPCProtocol

IPC protocol constants for ZMQ weight streaming.

Functions#

resolve_model_class

Resolve the appropriate model class for a given model name.

is_vllm_v1_engine_enabled

Check if vLLM V1 engine is enabled.

import_class_from_path

Import a class from a string path (e.g. ‘torch.optim.AdamW’).

get_gpu_info

Return information about the GPU being used by this worker.

sliding_window_overwrite

Returns configuration overrides to handle sliding window settings based on model rules.

configure_dynamo_cache

Disable dynamo autotune_local_cache.

get_runtime_env_for_policy_worker

Get runtime environment configuration for policy workers.

get_megatron_checkpoint_dir

Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.

get_handle_from_tensor

Get IPC handle from a tensor.

calculate_aligned_size

Calculate aligned size for memory alignment.

stream_weights_via_ipc_zmq_impl

Shared implementation for streaming weights via IPC ZMQ with improved memory management.

rebuild_cuda_tensor_from_ipc

Rebuild a CUDA tensor from an IPC handle.

Data#

API#

nemo_rl.models.policy.utils.AUTOMODEL_FACTORY: Dict[str, Any]#

None

class nemo_rl.models.policy.utils.IPCProtocol(*args, **kwds)#

Bases: enum.Enum

IPC protocol constants for ZMQ weight streaming.

Initialization

COMPLETE#

‘complete’

ACK#

‘ack’

nemo_rl.models.policy.utils.resolve_model_class(model_name: str) Any#

Resolve the appropriate model class for a given model name.

nemo_rl.models.policy.utils.is_vllm_v1_engine_enabled() bool#

Check if vLLM V1 engine is enabled.

Returns:

True if V1 engine is enabled, False otherwise (defaults to True if not set)

Return type:

bool

nemo_rl.models.policy.utils.import_class_from_path(name: str) Any#

Import a class from a string path (e.g. ‘torch.optim.AdamW’).

Parameters:

full_path – Full path to class including module path and class name

Returns:

The imported class object

nemo_rl.models.policy.utils.get_gpu_info(model: torch.nn.Module) dict[str, Any]#

Return information about the GPU being used by this worker.

nemo_rl.models.policy.utils.sliding_window_overwrite(model_name: str) dict[str, Any]#

Returns configuration overrides to handle sliding window settings based on model rules.

Parameters:

model_name – The HuggingFace model name or path to load configuration from

Returns:

Dictionary with overwrite values, or empty dict if no overwrites needed

Return type:

dict

nemo_rl.models.policy.utils.configure_dynamo_cache() None#

Disable dynamo autotune_local_cache.

Dynamo may fail at cached_autotune when there’s already a cache with different order of node_bundles. Disable autotune_local_cache as a workaround. See https://github.com/pytorch/pytorch/issues/153791 for more details.

nemo_rl.models.policy.utils.get_runtime_env_for_policy_worker(
policy_worker_name: str,
) dict[str, Any]#

Get runtime environment configuration for policy workers.

Note: expandable_segments configuration is handled directly in the worker init methods to ensure proper GPU detection after CUDA initialization.

nemo_rl.models.policy.utils.get_megatron_checkpoint_dir() str#

Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.

Megatron initial checkpoint should be saved to a path available on all nodes. The directory used will take this order of precendence:

  1. $NRL_MEGATRON_CHECKPOINT_DIR (if set)

  2. $HF_HOME/nemo_rl (if HF_HOME is set)

  3. ~/.cache/huggingface/nemo_rl

HF_HOME is preferred since many users will also have that path mounted and it means one less directory to mount into your runtime environment.

nemo_rl.models.policy.utils.get_handle_from_tensor(tensor: torch.Tensor) tuple[Any]#

Get IPC handle from a tensor.

nemo_rl.models.policy.utils.calculate_aligned_size(size_bytes: int, alignment: int = 512) int#

Calculate aligned size for memory alignment.

Parameters:
  • size_bytes (int) – Size in bytes to align

  • alignment (int) – Alignment boundary in bytes (default 512)

Returns:

Aligned size in bytes(int).

nemo_rl.models.policy.utils.stream_weights_via_ipc_zmq_impl(
params_generator,
buffer_size_bytes: int,
zmq_socket,
rank: int,
worker_name: str,
) None#

Shared implementation for streaming weights via IPC ZMQ with improved memory management.

Uses ping-pong double buffering to enable overlapping communication while reusing buffers to reduce memory allocation overhead and improve stability.

Parameters:
  • params_generator – Generator yielding (name, tensor) pairs

  • buffer_size_bytes – total size of buffer in bytes for batching parameters

  • zmq_socket – ZMQ socket for communication

  • rank – Worker rank for logging

  • worker_name – Name of the worker for logging

nemo_rl.models.policy.utils.rebuild_cuda_tensor_from_ipc(
cuda_ipc_handle: tuple,
device_id: int,
) torch.Tensor#

Rebuild a CUDA tensor from an IPC handle.