nemo_rl.models.policy.utils#
Module Contents#
Classes#
IPC protocol constants for ZMQ weight streaming. |
Functions#
Resolve the appropriate model class for a given model name. |
|
Check if vLLM V1 engine is enabled. |
|
Import a class from a string path (e.g. ‘torch.optim.AdamW’). |
|
Return information about the GPU being used by this worker. |
|
Returns configuration overrides to handle sliding window settings based on model rules. |
|
Disable dynamo autotune_local_cache. |
|
Get runtime environment configuration for policy workers. |
|
Gets the default megatron checkpoint directory for initial HF -> Mcore conversion. |
|
Get IPC handle from a tensor. |
|
Calculate aligned size for memory alignment. |
|
Shared implementation for streaming weights via IPC ZMQ with improved memory management. |
|
Rebuild a CUDA tensor from an IPC handle. |
Data#
API#
- nemo_rl.models.policy.utils.AUTOMODEL_FACTORY: Dict[str, Any]#
None
- class nemo_rl.models.policy.utils.IPCProtocol(*args, **kwds)#
Bases:
enum.EnumIPC protocol constants for ZMQ weight streaming.
Initialization
- COMPLETE#
‘complete’
- ACK#
‘ack’
- nemo_rl.models.policy.utils.resolve_model_class(model_name: str) Any#
Resolve the appropriate model class for a given model name.
- nemo_rl.models.policy.utils.is_vllm_v1_engine_enabled() bool#
Check if vLLM V1 engine is enabled.
- Returns:
True if V1 engine is enabled, False otherwise (defaults to True if not set)
- Return type:
bool
- nemo_rl.models.policy.utils.import_class_from_path(name: str) Any#
Import a class from a string path (e.g. ‘torch.optim.AdamW’).
- Parameters:
full_path – Full path to class including module path and class name
- Returns:
The imported class object
- nemo_rl.models.policy.utils.get_gpu_info(model: torch.nn.Module) dict[str, Any]#
Return information about the GPU being used by this worker.
- nemo_rl.models.policy.utils.sliding_window_overwrite(model_name: str) dict[str, Any]#
Returns configuration overrides to handle sliding window settings based on model rules.
- Parameters:
model_name – The HuggingFace model name or path to load configuration from
- Returns:
Dictionary with overwrite values, or empty dict if no overwrites needed
- Return type:
dict
- nemo_rl.models.policy.utils.configure_dynamo_cache() None#
Disable dynamo autotune_local_cache.
Dynamo may fail at cached_autotune when there’s already a cache with different order of node_bundles. Disable autotune_local_cache as a workaround. See https://github.com/pytorch/pytorch/issues/153791 for more details.
- nemo_rl.models.policy.utils.get_runtime_env_for_policy_worker(
- policy_worker_name: str,
Get runtime environment configuration for policy workers.
Note: expandable_segments configuration is handled directly in the worker init methods to ensure proper GPU detection after CUDA initialization.
- nemo_rl.models.policy.utils.get_megatron_checkpoint_dir() str#
Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.
Megatron initial checkpoint should be saved to a path available on all nodes. The directory used will take this order of precendence:
$NRL_MEGATRON_CHECKPOINT_DIR (if set)
$HF_HOME/nemo_rl (if HF_HOME is set)
~/.cache/huggingface/nemo_rl
HF_HOME is preferred since many users will also have that path mounted and it means one less directory to mount into your runtime environment.
- nemo_rl.models.policy.utils.get_handle_from_tensor(tensor: torch.Tensor) tuple[Any]#
Get IPC handle from a tensor.
- nemo_rl.models.policy.utils.calculate_aligned_size(size_bytes: int, alignment: int = 512) int#
Calculate aligned size for memory alignment.
- Parameters:
size_bytes (int) – Size in bytes to align
alignment (int) – Alignment boundary in bytes (default 512)
- Returns:
Aligned size in bytes(int).
- nemo_rl.models.policy.utils.stream_weights_via_ipc_zmq_impl(
- params_generator,
- buffer_size_bytes: int,
- zmq_socket,
- rank: int,
- worker_name: str,
Shared implementation for streaming weights via IPC ZMQ with improved memory management.
Uses ping-pong double buffering to enable overlapping communication while reusing buffers to reduce memory allocation overhead and improve stability.
- Parameters:
params_generator – Generator yielding (name, tensor) pairs
buffer_size_bytes – total size of buffer in bytes for batching parameters
zmq_socket – ZMQ socket for communication
rank – Worker rank for logging
worker_name – Name of the worker for logging
- nemo_rl.models.policy.utils.rebuild_cuda_tensor_from_ipc(
- cuda_ipc_handle: tuple,
- device_id: int,
Rebuild a CUDA tensor from an IPC handle.