# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import os
from typing import Any
import torch
from transformers import AutoConfig
from nemo_rl.distributed.worker_group_utils import get_nsight_config_if_pattern_matches
[docs]
def is_vllm_v1_engine_enabled() -> bool:
"""Check if vLLM V1 engine is enabled.
Returns:
bool: True if V1 engine is enabled, False otherwise (defaults to True if not set)
"""
return os.environ.get("NRL_VLLM_USE_V1", "1") == "1"
[docs]
def import_class_from_path(name: str) -> Any:
"""Import a class from a string path (e.g. 'torch.optim.AdamW').
Args:
full_path: Full path to class including module path and class name
Returns:
The imported class object
"""
module_name, cls_name = name.rsplit(".", 1)
cls_instance = getattr(importlib.import_module(module_name), cls_name)
return cls_instance
[docs]
def get_gpu_info(model: torch.nn.Module) -> dict[str, Any]:
"""Return information about the GPU being used by this worker."""
import torch
# Get distributed training info
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
local_rank = int(os.environ.get("LOCAL_RANK", 0))
# Get device info from CUDA
device = torch.cuda.current_device()
device_name = torch.cuda.get_device_name(device)
device_count = torch.cuda.device_count()
memory_allocated = torch.cuda.memory_allocated(device) / (1024**2) # in MB
memory_reserved = torch.cuda.memory_reserved(device) / (1024**2) # in MB
peak_memory = torch.cuda.max_memory_allocated() / (1024**2) # in MB
peak_reserved = torch.cuda.max_memory_reserved() / (1024**2) # in MB
# Try to get the real global device ID (not the local one)
# In distributed training, each process only sees its assigned GPU as device 0
local_device_id = device
global_device_id = local_device_id
if "CUDA_VISIBLE_DEVICES" in os.environ:
cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"].split(",")
if local_rank < len(cuda_visible_devices):
global_device_id = int(cuda_visible_devices[local_rank])
# Get a parameter from the model to verify CUDA device placement
# This confirms tensors are actually on the appropriate device
param_info = {}
for module_name, module in model.named_modules():
for param_name, param in module.named_parameters(recurse=False):
if param is not None and param.requires_grad:
full_name = f"{module_name}.{param_name}"
param_info[full_name] = {
"device": str(param.device),
"shape": list(param.shape),
"dtype": str(param.dtype),
}
# Just grab one parameter for verification
break
if param_info:
break
return {
"rank": rank,
"world_size": world_size,
"local_rank": local_rank,
"local_device_id": local_device_id,
"global_device_id": global_device_id,
"device_count": device_count,
"device_name": device_name,
"memory_allocated_mb": memory_allocated,
"memory_reserved_mb": memory_reserved,
"peak_memory_allocated_mb": peak_memory,
"peak_memory_reserved_mb": peak_reserved,
"parameter_sample": param_info,
"env_vars": {
k: v
for k, v in os.environ.items()
if k.startswith("CUDA") or k in ["LOCAL_RANK", "RANK", "WORLD_SIZE"]
},
}
[docs]
def sliding_window_overwrite(model_name: str) -> dict[str, Any]:
"""Returns configuration overrides to handle sliding window settings based on model rules.
Args:
model_name: The HuggingFace model name or path to load configuration from
Returns:
dict: Dictionary with overwrite values, or empty dict if no overwrites needed
"""
hf_config = AutoConfig.from_pretrained(model_name, trust_remote_code=True)
overwrite_dict = {}
# Override sliding_window setting to address a HF mismatch relevant to use_sliding_window
# TODO(@zhiyul): remove this once the bug is fixed https://github.com/huggingface/transformers/issues/38002
if (
hasattr(hf_config, "use_sliding_window")
and hf_config.use_sliding_window == False
):
assert hasattr(hf_config, "sliding_window")
overwrite_dict = {
"sliding_window": None,
}
print(
f"use_sliding_window=False in config - overriding sliding_window parameter to None: {overwrite_dict}"
)
return overwrite_dict
[docs]
def get_runtime_env_for_policy_worker(policy_worker_name: str) -> dict[str, Any]:
"""Get runtime environment configuration for policy workers.
Note: expandable_segments configuration is handled directly in the worker init methods
to ensure proper GPU detection after CUDA initialization.
"""
runtime_env = {
**get_nsight_config_if_pattern_matches(policy_worker_name),
}
return runtime_env
[docs]
def get_megatron_checkpoint_dir() -> str:
"""Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.
Megatron initial checkpoint should be saved to a path available on all nodes. The directory used will take this order of precendence:
1. $NRL_MEGATRON_CHECKPOINT_DIR (if set)
2. $HF_HOME/nemo_rl (if HF_HOME is set)
3. ~/.cache/huggingface/nemo_rl
HF_HOME is preferred since many users will also have that path mounted and it means one less directory
to mount into your runtime environment.
"""
nrl_checkpoint_dir = os.environ.get("NRL_MEGATRON_CHECKPOINT_DIR")
if nrl_checkpoint_dir is not None and nrl_checkpoint_dir.strip():
checkpoint_dir = nrl_checkpoint_dir
else:
hf_home = os.environ.get("HF_HOME")
if hf_home is not None and hf_home.strip():
checkpoint_dir = os.path.join(hf_home, "nemo_rl")
else:
checkpoint_dir = os.path.join(
os.path.expanduser("~"), ".cache", "huggingface", "nemo_rl"
)
print(f"Using default megatron checkpoint dir: {checkpoint_dir}")
return checkpoint_dir