Source code for nemo_run.core.execution.base

# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import importlib.util
import os
from dataclasses import asdict, dataclass, field
from string import Template
from typing import Optional, Protocol, Union, runtime_checkable

import fiddle as fdl
from torchx.specs import Role
from typing_extensions import Self

from nemo_run.config import ConfigurableMixin, get_nemorun_home
from nemo_run.core.execution.launcher import LAUNCHER_MAP, Launcher
from nemo_run.core.packaging.base import Packager


[docs] @dataclass(kw_only=True) class ExecutorMacros(ConfigurableMixin): """ Defines macros. """ HEAD_NODE_IP_VAR = "${head_node_ip_var}" NPROC_PER_NODE_VAR = "${nproc_per_node_var}" NUM_NODES_VAR = "${num_nodes_var}" NODE_RANK_VAR = "${node_rank_var}" FT_LAUNCHER_CFG_PATH_VAR = "${ft_launcher_cfg_path_var}" head_node_ip_var: str nproc_per_node_var: str num_nodes_var: str node_rank_var: str het_group_host_var: str ft_launcher_cfg_path_var: str = "FAULT_TOL_CFG_PATH" @staticmethod def group_host(index: int): return f"$$${{het_group_host_var}}_{index}"
[docs] def apply(self, role: Role) -> Role: """ apply applies the values to a copy the specified role and returns it. """ role = copy.deepcopy(role) role.args = [self.substitute(arg) for arg in role.args] role.env = {key: self.substitute(arg) for key, arg in role.env.items()} return role
[docs] def substitute(self, arg: str) -> str: """ substitute applies the values to the template arg. """ return Template(arg).safe_substitute(**asdict(self))
@runtime_checkable class LogSupportedExecutor(Protocol): @classmethod def logs(cls, app_id: str, fallback_path: Optional[str]): ...
[docs] @dataclass(kw_only=True) class Executor(ConfigurableMixin): """ Base dataclass for configuration of an executor. This cannot be used independently but you can use this as the base type to register executor factories. See :class:`LocalExecutor` and :class:`SlurmExecutor` for examples. """ packager: Packager = field(default_factory=lambda: Packager()) launcher: Optional[Union[Launcher, str]] = None env_vars: dict[str, str] = field(default_factory=dict) retries: int = 0 #: Set by run.Experiment experiment_id: Optional[str] = None #: Directory that will store metadata for your run. #: This is set automatically if using run.Experiment job_dir: str = "" experiment_dir: str = field(init=False, default="") _launcher_setup: bool = field(init=False, default=False) def info(self) -> str: return self.__class__.__qualname__ def clone(self) -> Self: return fdl.build(self.to_config()) def get_launcher(self) -> Launcher: if not self._launcher_setup: self._setup_launcher() self._launcher_setup = True assert self.launcher is None or isinstance(self.launcher, Launcher), ( f"{self.info()} could not setup the launcher." ) if self.launcher is None: self.launcher = Launcher() return self.launcher
[docs] def assign( self, exp_id: str, exp_dir: str, task_id: str, task_dir: str, ) -> None: """ This function will be called by run.Experiment to assign the executor for the specific experiment. """ raise NotImplementedError
[docs] def nnodes(self) -> int: """ Helper function called by torchrun component to determine --nnodes. """ raise NotImplementedError
[docs] def nproc_per_node(self) -> int: """ Helper function called by torchrun component to determine --nproc-per-node. """ raise NotImplementedError
[docs] def macro_values(self) -> Optional[ExecutorMacros]: """ Get macro values specific to the executor. This allows replacing common macros with executor specific vars for node ips, etc. """ return None
def _setup_launcher(self): if not self.launcher: return None if isinstance(self.launcher, str): self.launcher = LAUNCHER_MAP[self.launcher]() def get_launcher_prefix(self) -> Optional[list[str]]: launcher = self.get_launcher() if launcher.nsys_profile: os.makedirs(os.path.join(self.job_dir, launcher.nsys_folder), exist_ok=True) return launcher.get_nsys_prefix(profile_dir=self.job_dir) def get_nsys_entrypoint(self) -> str: return ("nsys", "") def supports_launcher_transform(self) -> bool: return False def package_configs(self, *cfgs: tuple[str, str]) -> list[str]: filenames = [] basepath = os.path.join(self.job_dir, "configs") os.makedirs(basepath, exist_ok=True) for name, cfg in cfgs: filename = os.path.join(basepath, name) os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "w") as f: f.write(cfg) filenames.append(filename) return filenames def create_job_dir(self): os.makedirs(self.job_dir, exist_ok=True) def cleanup(self, handle: str): ...
[docs] def import_executor( name: str, file_path: Optional[str] = None, call: bool = True, **kwargs ) -> Executor: """ Retrieves an executor instance by name from a specified or default Python file. The file must contain either a function or executor instance by the provided name. This function dynamically imports the file_path, searches for the name attr and returns the value corresponding to the given name, and optionally calls the value if call is True. This functionality allows you to define all your executors in a single file which lives separately from your codebase. It is similar to ~/.ssh/config and allows you to use executors across your projects without having to redefine them. Example: executor = import_executor("local", file_path="path/to/executors.py") executor = import_executor("gpu") # Uses the default location of os.path.join(get_nemorun_home(), "executors.py") Args: name (str): The name of the executor to retrieve. file_path (Optional[str]): The path to the Python file containing the executor definitions. Defaults to None, in which case the default location of os.path.join(get_nemorun_home(), "executors.py") is used. The file_path is expected to be a string representing a file path with the following structure: - It should be a path to a Python file (with a .py extension). - The file should contain a dictionary named `EXECUTOR_MAP` that maps executor names to their corresponding instances. - The file can be located anywhere in the file system, but if not provided, it defaults to `get_nemorun_home()/executors.py`. call (bool): If True, the value from the module is called with the rest of the given kwargs. Returns: Executor: The executor instance corresponding to the given name. """ if not file_path: file_path = os.path.join(get_nemorun_home(), "executors.py") spec = importlib.util.spec_from_file_location("executors", file_path) assert spec module = importlib.util.module_from_spec(spec) assert spec.loader spec.loader.exec_module(module) executor_fn = getattr(module, name) if not callable(executor_fn): return executor_fn return executor_fn(**kwargs) # type: ignore