Source code for nemo_automodel.components.launcher.interactive

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import importlib
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional

from nemo_automodel.components.launcher.base import Launcher

logger = logging.getLogger(__name__)


[docs] def _get_repo_root() -> Path: """Return the repository root. If CWD looks like an editable checkout, prepend it to ``PYTHONPATH`` so the local source takes precedence.""" cwd = Path.cwd() if (cwd / "nemo_automodel/components").exists() and (cwd / "examples/").exists(): new_pp = str(cwd) if "PYTHONPATH" in os.environ: new_pp += ":" + os.environ["PYTHONPATH"] os.environ["PYTHONPATH"] = new_pp logger.info("Running job using source from: %s", cwd) return cwd return Path(__file__).parents[3]
[docs] def resolve_recipe_cls(target_str: str): """Import and return the recipe class from a dotted path. " pip install nemo-automodel # CPU/basic\n" " pip install nemo-automodel[all] # with CUDA & all extras\n\n" """ module_path, cls_name = target_str.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, cls_name)
[docs] def _recipe_module_path(recipe_target: str, repo_root: Path) -> Path: """Convert a dotted recipe target into an absolute filesystem path.""" module_path = recipe_target.rsplit(".", 1)[0] relative = module_path.replace(".", "/") + ".py" return repo_root / relative
_INSTALL_MSG = ( "Local/interactive execution requires PyTorch and the full nemo_automodel package.\n" "It looks like you have the lightweight CLI-only install (automodel[cli]).\n\n" "To run jobs locally, install the full package:\n" " pip install nemo_automodel # CPU/basic\n" " pip install nemo_automodel[all] # with CUDA & all extras\n\n" "For SLURM clusters, use sbatch with the reference slurm.sub script.\n" "For SkyPilot or NeMo-Run, add a skypilot: or nemo_run: section to your YAML.\n\n" "See: https://github.com/NVIDIA/NeMo-Automodel#readme" )
[docs] class InteractiveLauncher(Launcher): """Launch a recipe locally on the current node using torchrun or in-process."""
[docs] @staticmethod def _is_torchrun_worker() -> bool: """Return True when this process was already spawned by torchrun. torchrun (``torch.distributed.run``) sets both ``LOCAL_RANK`` and ``TORCHELASTIC_RUN_ID`` in the environment of every worker it spawns. We check for both to avoid false positives from environments (e.g. SLURM) that may set ``LOCAL_RANK`` without an active torchrun session. When the user launches the CLI via ``torchrun --nproc-per-node N -m nemo_automodel.cli.app config.yaml``, each worker must run the recipe in-process instead of re-launching torchrun. """ return "LOCAL_RANK" in os.environ and "TORCHELASTIC_RUN_ID" in os.environ
[docs] def _run_recipe_in_process(self, recipe_target: str, config: Dict[str, Any]) -> int: """Instantiate and run a recipe in the current process.""" recipe_cls = resolve_recipe_cls(recipe_target) recipe = recipe_cls(config) recipe.setup() return recipe.run_train_validation_loop()
[docs] def launch( self, config: Dict[str, Any], config_path: Path, recipe_target: str, launcher_config: Any = None, extra_args: Optional[List[str]] = None, ) -> int: try: from torch.distributed.run import determine_local_world_size, get_args_parser from torch.distributed.run import run as thrun except ImportError: logger.error(_INSTALL_MSG) return 1 # Already inside a torchrun worker (e.g. user ran # ``torchrun --nproc-per-node N -m nemo_automodel.cli.app config.yaml``). # Run the recipe directly; do NOT re-launch torchrun. if self._is_torchrun_worker(): logger.info( "Detected existing torchrun environment (LOCAL_RANK=%s); running recipe in-process.", os.environ["LOCAL_RANK"], ) return self._run_recipe_in_process(recipe_target, config) nproc_per_node: Optional[int] = launcher_config repo_root = _get_repo_root() script_path = _recipe_module_path(recipe_target, repo_root) num_devices = determine_local_world_size(nproc_per_node="gpu") assert num_devices > 0, "Expected num-devices to be > 0" if nproc_per_node == 1 or num_devices == 1: logger.info("Launching job locally on a single device") return self._run_recipe_in_process(recipe_target, config) else: effective_nproc = nproc_per_node if nproc_per_node is not None else num_devices logger.info("Launching job locally on %d devices", effective_nproc) torchrun_parser = get_args_parser() torchrun_args, _ = torchrun_parser.parse_known_args() torchrun_args.training_script = str(script_path) torchrun_args.training_script_args = ["-c", str(config_path)] if extra_args: torchrun_args.training_script_args.extend(extra_args) torchrun_args.nproc_per_node = effective_nproc return thrun(torchrun_args)