Source code for nemo_rl.distributed.worker_group_utils

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import fnmatch
import logging
from copy import deepcopy
from typing import Any

from nemo_rl.utils.nsys import NRL_NSYS_PROFILE_STEP_RANGE, NRL_NSYS_WORKER_PATTERNS


[docs] def get_nsight_config_if_pattern_matches(worker_name: str) -> dict[str, Any]: """Check if worker name matches patterns in NRL_NSYS_WORKER_PATTERNS and return nsight config. Args: worker_name: Name of the worker to check against patterns Returns: Dictionary containing {"nsight": config} if pattern matches, empty dict otherwise """ assert not (bool(NRL_NSYS_WORKER_PATTERNS) ^ bool(NRL_NSYS_PROFILE_STEP_RANGE)), ( "Either both NRL_NSYS_WORKER_PATTERNS and NRL_NSYS_PROFILE_STEP_RANGE must be set, or neither. See https://github.com/NVIDIA/NeMo-RL/tree/main/docs/nsys-profiling.md for more details." ) patterns_env = NRL_NSYS_WORKER_PATTERNS if not patterns_env: return {} # Parse CSV patterns patterns = [ pattern.strip() for pattern in patterns_env.split(",") if pattern.strip() ] # Check if worker name matches any pattern for pattern in patterns: if fnmatch.fnmatch(worker_name, pattern): logging.info( f"Nsight profiling enabled for worker '{worker_name}' (matched pattern '{pattern}')" ) return { "nsight": { "t": "cuda,cudnn,cublas,nvtx", "o": f"'{worker_name}_{NRL_NSYS_PROFILE_STEP_RANGE}_%p'", "stop-on-exit": "true", # Capture range is required to control the scope of the profile # Profile will only start/stop when torch.cuda.profiler.start()/stop() is called "capture-range": "cudaProfilerApi", "capture-range-end": "stop", } } return {}
[docs] def recursive_merge_options( default_options: dict[str, Any], extra_options: dict[str, Any] ) -> dict[str, Any]: """Recursively merge extra options into default options using OmegaConf. Args: default_options: Default options dictionary (lower precedence) extra_options: Extra options provided by the caller (higher precedence) Returns: Merged options dictionary with extra_options taking precedence over default_options """ # Convert to OmegaConf DictConfig for robust merging default_conf = deepcopy(default_options) extra_conf = deepcopy(extra_options) def recursive_merge_dict(base, incoming): """Recursively merge incoming dict into base dict, with incoming taking precedence.""" if isinstance(incoming, dict): for k, v in incoming.items(): if k in base and isinstance(base[k], dict) and isinstance(v, dict): # Both are dicts, recurse recursive_merge_dict(base[k], v) else: # Incoming takes precedence (overwrites base) - handles all cases: # - scalar replacing dict, dict replacing scalar, scalar replacing scalar base[k] = deepcopy(v) # Handle special nsight configuration transformation (_nsight -> nsight) early # so that extra_options can properly override the transformed default # https://github.com/ray-project/ray/blob/3c4a5b65dd492503a707c0c6296820228147189c/python/ray/runtime_env/runtime_env.py#L345 if "runtime_env" in default_conf and isinstance(default_conf["runtime_env"], dict): runtime_env = default_conf["runtime_env"] if "_nsight" in runtime_env and "nsight" not in runtime_env: runtime_env["nsight"] = runtime_env["_nsight"] del runtime_env["_nsight"] # Merge in place recursive_merge_dict(base=default_conf, incoming=extra_conf) return default_conf