Important
You are viewing the NeMo 2.0 documentation. This release introduces significant changes to the API and a new library, NeMo Run. We are currently porting all features from NeMo 1.0 to 2.0. For documentation on previous versions or features not yet available in 2.0, please refer to the NeMo 24.07 documentation.
Execution#
Executors#
- class nemo_run.core.execution.base.Executor(
- *,
- packager: ~nemo_run.core.packaging.base.Packager = <factory>,
- launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
- env_vars: dict[str,
- str] = <factory>,
- retries: int = 0,
- experiment_id: str | None = None,
- job_dir: str = '',
Bases:
object
Base dataclass for configuration of an executor. This cannot be used independently but you can use this as the base type to register executor factories.
See
LocalExecutor
andSlurmExecutor
for examples.- assign(
- exp_id: str,
- exp_dir: str,
- task_id: str,
- task_dir: str,
This function will be called by run.Experiment to assign the executor for the specific experiment.
- cleanup(handle: str)#
- clone()#
- env_vars: dict[str, str]#
- experiment_dir: str = ''#
- experiment_id: str | None = None#
Set by run.Experiment
- get_launcher() Launcher #
- get_launcher_prefix() list[str] | None #
- info() str #
- job_dir: str = ''#
Directory that will store metadata for your run. This is set automatically if using run.Experiment
- launcher: Launcher | str | None = None#
- macro_values() ExecutorMacros | None #
Get macro values specific to the executor. This allows replacing common macros with executor specific vars for node ips, etc.
- nnodes() int #
Helper function called by torchrun component to determine –nnodes.
- nproc_per_node() int #
Helper function called by torchrun component to determine –nproc-per-node.
- package_configs(*cfgs: tuple[str, str]) list[str] #
- retries: int = 0#
- class nemo_run.core.execution.local.LocalExecutor(
- *,
- packager: ~nemo_run.core.packaging.base.Packager = <factory>,
- launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
- env_vars: dict[str,
- str] = <factory>,
- retries: int = 0,
- experiment_id: str | None = None,
- job_dir: str = '',
- ntasks_per_node: int = 1,
Bases:
Executor
Dataclass to configure local executor.
Example:
run.LocalExecutor()
- assign(
- exp_id: str,
- exp_dir: str,
- task_id: str,
- task_dir: str,
This function will be called by run.Experiment to assign the executor for the specific experiment.
- nnodes() int #
Helper function called by torchrun component to determine –nnodes.
- nproc_per_node() int #
Helper function called by torchrun component to determine –nproc-per-node.
- ntasks_per_node: int = 1#
Used by components like torchrun to deduce the number of tasks to launch.
- class nemo_run.core.execution.skypilot.SkypilotExecutor(
- *,
- packager: ~nemo_run.core.packaging.git.GitArchivePackager = <factory>,
- launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
- env_vars: dict[str,
- str] = <factory>,
- retries: int = 0,
- experiment_id: str | None = None,
- job_dir: str = '',
- container_image: str | None = None,
- cloud: str | list[str] | None = None,
- region: str | list[str] | None = None,
- zone: str | list[str] | None = None,
- gpus: str | list[str] | None = None,
- gpus_per_node: int | list[int] | None = None,
- cpus: int | float | list[int | float] | None = None,
- memory: int | float | list[int | float] | None = None,
- instance_type: str | list[str] | None = None,
- num_nodes: int = 1,
- use_spot: bool | list[bool] | None = None,
- disk_size: int | list[int] | None = None,
- disk_tier: str | list[str] | None = None,
- ports: tuple[str] | None = None,
- file_mounts: dict[str,
- str] | None = None,
- cluster_name: str | None = None,
- setup: str | None = None,
- autodown: bool = False,
- idle_minutes_to_autostop: int | None = None,
Bases:
Executor
Dataclass to configure a Skypilot Executor.
Some familiarity with Skypilot is necessary. In order to use this executor, you need to install NeMo Run with either skypilot (for only Kubernetes) or skypilot-all (for all clouds) optional features.
Example:
skypilot = SkypilotExecutor( gpus="A10G", gpus_per_node=devices, container_image="nvcr.io/nvidia/nemo:dev", cloud="kubernetes", cluster_name="nemo_tester", file_mounts={ "nemo_run.whl": "nemo_run.whl" }, setup=""" conda deactivate nvidia-smi ls -al ./ pip install nemo_run.whl cd /opt/NeMo && git pull origin main && pip install . """, )
- HEAD_NODE_IP_VAR = 'head_node_ip'#
- HET_GROUP_HOST_VAR = 'het_group_host'#
- NODE_RANK_VAR = 'SKYPILOT_NODE_RANK'#
- NPROC_PER_NODE_VAR = 'SKYPILOT_NUM_GPUS_PER_NODE'#
- NUM_NODES_VAR = 'num_nodes'#
- assign(
- exp_id: str,
- exp_dir: str,
- task_id: str,
- task_dir: str,
This function will be called by run.Experiment to assign the executor for the specific experiment.
- autodown: bool = False#
- classmethod cancel(app_id: str)#
- cleanup(handle: str)#
- cloud: str | list[str] | None = None#
- cluster_name: str | None = None#
- container_image: str | None = None#
- cpus: int | float | list[int | float] | None = None#
- disk_size: int | list[int] | None = None#
- disk_tier: str | list[str] | None = None#
- file_mounts: dict[str, str] | None = None#
- gpus: str | list[str] | None = None#
- gpus_per_node: int | list[int] | None = None#
- idle_minutes_to_autostop: int | None = None#
- instance_type: str | list[str] | None = None#
- launch(
- task: skyt.Task,
- cluster_name: str | None = None,
- num_nodes: int | None = None,
- workdir: str | None = None,
- detach_run: bool = True,
- dryrun: bool = False,
- classmethod logs(app_id: str, fallback_path: str | None)#
- macro_values() ExecutorMacros | None #
Get macro values specific to the executor. This allows replacing common macros with executor specific vars for node ips, etc.
- memory: int | float | list[int | float] | None = None#
- nnodes() int #
Helper function called by torchrun component to determine –nnodes.
- nproc_per_node() int #
Helper function called by torchrun component to determine –nproc-per-node.
- num_nodes: int = 1#
- package() str #
- package_configs(*cfgs: tuple[str, str]) list[str] #
- packager: GitArchivePackager#
- classmethod parse_app(app_id: str) tuple[str, str, int] #
- ports: tuple[str] | None = None#
- region: str | list[str] | None = None#
- setup: str | None = None#
- classmethod status(
- app_id: str,
- to_resources() set[sky.Resources] #
- to_task(
- name: str,
- cmd: list[str] | None = None,
- env_vars: dict[str, str] | None = None,
- use_spot: bool | list[bool] | None = None#
- property workdir: str#
- zone: str | list[str] | None = None#
- class nemo_run.core.execution.slurm.SlurmExecutor(
- *,
- packager: ~nemo_run.core.packaging.git.GitArchivePackager = <factory>,
- launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
- env_vars: dict[str,
- str] = <factory>,
- retries: int = 0,
- experiment_id: str | None = None,
- job_dir: str = '',
- account: str,
- partition: str | None = None,
- job_name_prefix: str | None = None,
- time: str = '00:10:00',
- nodes: int = 1,
- ntasks_per_node: int = 1,
- cpus_per_task: int | None = None,
- cpus_per_gpu: int | None = None,
- gpus_per_node: int | None = None,
- gpus_per_task: int | None = None,
- qos: str | None = None,
- mem: str | None = None,
- mem_per_gpu: str | None = None,
- mem_per_cpu: str | None = None,
- comment: str | None = None,
- constraint: str | None = None,
- exclude: str | None = None,
- gres: str | None = None,
- signal: str | None = None,
- exclusive: bool | str | None = None,
- array: str | None = None,
- open_mode: str = 'append',
- container_image: str | None = None,
- container_mounts: list[str] = <factory>,
- additional_parameters: dict[str,
- ~typing.Any] | None = None,
- srun_args: list[str] | None = None,
- heterogeneous: bool = False,
- memory_measure: bool = False,
- job_paths_cls: ~typing.Type[~nemo_run.core.execution.slurm.JobPaths] = <class 'nemo_run.core.execution.slurm.JobPaths'>,
- tunnel: ~nemo_run.core.tunnel.client.SSHTunnel | ~nemo_run.core.tunnel.client.LocalTunnel = <factory>,
- dependencies: list[str] = <factory>,
- torchrun_nproc_per_node: int | None = None,
Bases:
Executor
Dataclass to configure a Slurm Cluster. During execution, sbatch related parameters will automatically get parsed to their corresponding sbatch flags.
Note
We assume that the underlying Slurm cluster has Pyxis enabled. The slurm executor will fail if the slurm cluster doesn’t support pyxis.
Example:
def your_slurm_executor() -> run.SlurmExecutor: ssh_tunnel = SSHTunnel( host=os.environ["SLURM_HOST"], user=os.environ["SLURM_USER"], job_dir=os.environ["SLURM_JOBDIR"], ) packager = GitArchivePackager() launcher = "torchrun" executor = SlurmExecutor( account=os.environ["SLURM_ACCT"], partition=os.environ["SLURM_PARTITION"], nodes=1, ntasks_per_node=1, tunnel=ssh_tunnel, container_image=os.environ["BASE_IMAGE"], time="00:30:00", packager=packager, launcher=launcher, ) return executor ... your_executor = your_slurm_executor()
- ALLOC_ARGS = ['account', 'partition', 'job-name', 'time', 'nodes', 'ntasks-per-node', 'qos', 'mem', 'mem-per-gpu', 'mem-per-cpu']#
- HEAD_NODE_IP_VAR = 'head_node_ip'#
- HET_GROUP_HOST_VAR = 'het_group_host'#
- NODE_RANK_VAR = 'SLURM_NODEID'#
- NPROC_PER_NODE_VAR = 'SLURM_NTASKS_PER_NODE'#
- NUM_NODES_VAR = 'SLURM_NNODES'#
- class ResourceRequest(
- *,
- packager: nemo_run.core.packaging.git.GitArchivePackager,
- nodes: int,
- ntasks_per_node: int,
- container_image: Optional[str] = None,
- gpus_per_node: Optional[int] = None,
- gpus_per_task: Optional[int] = None,
- container_mounts: list[str] = <factory>,
- env_vars: dict[str,
- str] = <factory>,
Bases:
object
- container_image: str | None = None#
- container_mounts: list[str]#
- env_vars: dict[str, str]#
- gpus_per_node: int | None = None#
- gpus_per_task: int | None = None#
- nodes: int#
- ntasks_per_node: int#
- packager: GitArchivePackager#
- SBATCH_FLAGS = ['account', 'acctg_freq', 'array', 'batch', 'clusters', 'constraint', 'container', 'container_id', 'core_spec', 'cpus_per_gpu', 'cpus_per_task', 'comment', 'debug', 'delay_boot', 'dependency', 'distribution', 'error', 'exclude', 'exclusive', 'export', 'get_user_env', 'gid', 'gpu_bind', 'gpu_freq', 'gpus', 'gpus_per_node', 'gpus_per_socket', 'gpus_per_task', 'gres', 'gres_flags', 'help', 'hold', 'ignore_pbs', 'input', 'job_name', 'kill_on_invalid_dep', 'licenses', 'mail_type', 'mail_user', 'mcs_label', 'mem', 'mem_bind', 'mem_per_cpu', 'mem_per_gpu', 'mincpus', 'network', 'nice', 'no_kill', 'no_requeue', 'nodefile', 'nodelist', 'nodes', 'ntasks', 'ntasks_per_core', 'ntasks_per_gpu', 'ntasks_per_node', 'ntasks_per_socket', 'open_mode', 'output', 'overcommit', 'oversubscribe', 'parsable', 'partition', 'power', 'prefer', 'priority', 'profile', 'propagate', 'qos', 'quiet', 'reboot', 'requeue', 'reservation', 'signal', 'sockets_per_node', 'spread_job', 'switches', 'test_only', 'thread_spec', 'threads_per_core', 'time', 'time_min', 'tmp', 'tres_bind', 'tres_per_task', 'uid', 'usage', 'verbose', 'version', 'wait', 'wait_all_nodes', 'wckey', 'wrap']#
List of sbatch flags in snake case
- SRUN_ARGS = ['account', 'partition', 'job-name', 'time', 'nodes', 'ntasks', 'ntasks-per-node', 'cpus-per-task', 'gpus-per-node', 'gpus-per-task', 'qos', 'mem', 'mem-per-gpu', 'mem-per-cpu', 'comment', 'constraint', 'exclude', 'gres', 'exclusive', 'array', 'additional-parameters', 'container-image', 'container-mounts', 'container-workdir']#
- account: str#
- additional_parameters: dict[str, Any] | None = None#
- alloc(job_name='interactive')#
- array: str | None = None#
- assign(
- exp_id: str,
- exp_dir: str,
- task_id: str,
- task_dir: str,
This function will be called by run.Experiment to assign the executor for the specific experiment.
- bash(job_name='interactive')#
- comment: str | None = None#
- connect_devspace(space, tunnel_dir=None)#
- constraint: str | None = None#
- container_image: str | None = None#
- container_mounts: list[str]#
- cpus_per_gpu: int | None = None#
- cpus_per_task: int | None = None#
- dependencies: list[str]#
List of TorchX app handles that will be parsed and passed to –dependency flag in sbatch.
- exclude: str | None = None#
- exclusive: bool | str | None = None#
- get_launcher_prefix() list[str] | None #
- gpus_per_node: int | None = None#
- gpus_per_task: int | None = None#
- gres: str | None = None#
- heterogeneous: bool = False#
- info() str #
- job_name: str = 'nemo-job'#
Set by the executor; cannot be initialized
- job_name_prefix: str | None = None#
- job_paths_cls#
alias of
JobPaths
- launch_devspace(
- space: DevSpace,
- job_name='interactive',
- env_vars: Dict[str, str] | None = None,
- add_workspace_to_pythonpath: bool = True,
- property local: LocalTunnel#
- property local_is_slurm: bool#
- macro_values() ExecutorMacros | None #
Get macro values specific to the executor. This allows replacing common macros with executor specific vars for node ips, etc.
- mem: str | None = None#
- mem_per_cpu: str | None = None#
- mem_per_gpu: str | None = None#
- memory_measure: bool = False#
- classmethod merge(
- executors: list[SlurmExecutor],
- num_tasks: int,
- nnodes() int #
Helper function called by torchrun component to determine –nnodes.
- nodes: int = 1#
- nproc_per_node() int #
Helper function called by torchrun component to determine –nproc-per-node.
- ntasks_per_node: int = 1#
- open_mode: str = 'append'#
- package_configs(*cfgs: tuple[str, str]) list[str] #
- packager: GitArchivePackager#
- parse_deps() list[str] #
Helper function to parse a list of TorchX app handles and return a list of Slurm Job IDs to use as dependencies.
- partition: str | None = None#
- qos: str | None = None#
- resource_group: list[ResourceRequest]#
- signal: str | None = None#
- property slurm: Tunnel#
- srun(
- cmd: str,
- job_name='interactive',
- flags=None,
- env_vars: Dict[str, str] | None = None,
- arg_dict=None,
- **kwargs,
- srun_args: list[str] | None = None#
- stderr_to_stdout: bool = True#
- time: str = '00:10:00'#
- torchrun_nproc_per_node: int | None = None#
Optional parameter to explicitly specify nproc_per_node for torchrun like components if the slurm cluster doesn’t support granular resource allocation.
- tunnel: SSHTunnel | LocalTunnel#
Tunnels#
- class nemo_run.core.tunnel.client.LocalTunnel(*, job_dir: str)#
Bases:
Tunnel
Local Tunnel for supported executors. Executes all commands locally. Currently only supports SlurmExecutor. Use if you are launching from login/other node inside the cluster.
- cleanup()#
- connect()#
- get(remote_path: str, local_path: str) None #
- host: str = 'localhost'#
- put(local_path: str, remote_path: str) None #
- run(
- command: str,
- hide: bool = True,
- warn: bool = False,
- **kwargs,
- user: str = ''#
- class nemo_run.core.tunnel.client.SSHTunnel(
- *,
- job_dir: str,
- host: str,
- user: str,
- identity: str | None = None,
- shell: str | None = None,
- pre_command: str | None = None,
Bases:
Tunnel
SSH Tunnel for supported executors. Currently only supports SlurmExecutor.
Uses key based authentication if identity is provided else password authentication.
Examples
ssh_tunnel = SSHTunnel( host=os.environ["SSH_HOST"], user=os.environ["SSH_USER"], job_dir=os.environ["REMOTE_JOBDIR"], ) another_ssh_tunnel = SSHTunnel( host=os.environ["ANOTHER_SSH_HOST"], user=os.environ["ANOTHER_SSH_USER"], job_dir=os.environ["ANOTHER_REMOTE_JOBDIR"], identity="path_to_private_key" )
- cleanup()#
- connect()#
- get(remote_path: str, local_path: str) None #
- host: str#
- identity: str | None = None#
- pre_command: str | None = None#
- put(local_path: str, remote_path: str) None #
- run(
- command: str,
- hide: bool = True,
- warn: bool = False,
- **kwargs,
- setup()#
Creates the job dir if it doesn’t exist
- shell: str | None = None#
- user: str#
Launchers#
- class nemo_run.core.execution.base.FaultTolerance(
- *,
- nsys_profile: bool = False,
- nsys_folder: str = 'nsys_profile',
- nsys_trace: list[str] = <factory>,
- cfg_path: str = '',
- finished_flag_file: str = '',
- job_results_file: str = '',
- rdzv_backend: str = 'c10d',
- rdzv_port: int = 29500,
- workload_check_interval: Optional[float] = None,
- initial_rank_heartbeat_timeout: Optional[float] = None,
- rank_heartbeat_timeout: Optional[float] = None,
- rank_termination_signal: Optional[str] = None,
- log_level: Optional[str] = None,
- max_restarts: Optional[int] = None,
Bases:
Launcher
- cfg_path: str = ''#
- finished_flag_file: str = ''#
- initial_rank_heartbeat_timeout: float | None = None#
- job_results_file: str = ''#
- log_level: str | None = None#
- max_restarts: int | None = None#
- rank_heartbeat_timeout: float | None = None#
- rank_termination_signal: str | None = None#
- rdzv_backend: str = 'c10d'#
- rdzv_port: int = 29500#
- workload_check_interval: float | None = None#
Packagers#
- class nemo_run.core.packaging.base.Packager(*, debug: bool = False)#
Bases:
object
Base class for packaging your code.
The packager is generally used as part of an Executor and provides the executor with information on how to package your code.
It can also include information on how to run your code. For example, a packager can determine whether to use torchrun or whether to use debug flags.
Note
This class can also be used independently as a passthrough packager. This is useful in cases where you do not need to package code. For example, a local executor which uses your current working directory or an executor that uses a docker image that has all the code included.
- debug: bool = False#
Uses component or executor specific debug flags if set to True.
- package(path: Path, job_dir: str, name: str) str #
- setup()#
This is run on the executor before starting your job.
- class nemo_run.core.packaging.git.GitArchivePackager(
- *,
- debug: bool = False,
- basepath: str = '',
- subpath: str = '',
- pip_installs: list[str] = <factory>,
- ref: str = 'HEAD',
- include_pattern: str = '',
Bases:
Packager
Uses git archive for packaging your code.
At a high level, it works in the following way:
base_path =
git rev-parse --show-toplevel
.Optionally define a subpath as
base_path/self.subpath
by settingsubpath
attribute.cd base_path
&&git archive --format=tar.gz --output={output_file} {self.ref}:{subpath}
This extracted tar file becomes the working directory for your job.
Note
git archive will only package code committed in the specified ref. Any uncommitted code will not be packaged. We are working on adding an option to package uncommitted code but it is not ready yet.
- basepath: str = ''#
- include_pattern: str = ''#
Include extra files in the archive which matches include_pattern This str will be included in the command as: find {include_pattern} -type f to get the list of extra files to include in the archive
- package(
- path: Path,
- job_dir: str,
- name: str,
- pip_installs: list[str]#
List of pip packages to install before starting your run. This is experimental and risky as it may change the underlying environment drastically. This doesn’t work when using torchrun.
- ref: str = 'HEAD'#
Git ref to use for archiving the code. Can be a branch name or a commit ref like HEAD.
- subpath: str = ''#
Relative subpath in your repo to package code from. For eg, if your repo has three folders a, b and c and you specify a as the subpath, only files inside a will be packaged. In your job, the root workdir will be a/.