CloudAI Benchmark Framework v1.5.0

networking/display/cloudai150/_modules/cloudai/workloads/megatron_bridge/megatron_bridge.html

Source code for cloudai.workloads.megatron_bridge.megatron_bridge

# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import List, Optional, Union, cast

from pydantic import Field, ValidationInfo, field_validator

from cloudai.core import DockerImage, GitRepo, Installable, PythonExecutable
from cloudai.models.workload import CmdArgs, TestDefinition


[docs] class MegatronBridgeCmdArgs(CmdArgs): """Megatron-Bridge launcher arguments (translated into `setup_experiment.py` flags).""" # Slurm/launcher-level gpu_type: str = Field(default="gb200") log_dir: str = Field(default="") time_limit: str = Field(default="00:30:00") container_image: str = Field(default="") num_gpus: int = Field(default=8) gpus_per_node: int = Field(default=8) custom_mounts: Optional[str] = Field(default=None) enable_vboost: Optional[bool] = Field(default=False) dryrun: Optional[bool] = Field(default=False) enable_nsys: Optional[bool] = Field(default=False) detach: Optional[bool] = Field(default=None) # Model/task model_name: str = Field(min_length=1) model_size: str = Field(min_length=1) domain: str = Field(default="llm") task: str = Field(default="pretrain") compute_dtype: str = Field(default="bf16") fp8_recipe: Optional[str] = Field(default=None) hf_token: Optional[str] = Field(default=None) nemo_home: Optional[str] = Field(default=None) wandb_key: Optional[str] = Field(default=None) wandb_prj_name: Optional[str] = Field(default=None) wandb_exp_name: Optional[str] = Field(default=None) # Feature flags (allow sweeps) use_tokendrop: Optional[Union[bool, List[bool]]] = Field(default=None) use_megatron_fsdp: Optional[Union[bool, List[bool]]] = Field(default=None) cuda_graph_impl: Optional[str] = Field(default=None) cuda_graph_scope: Optional[Union[str, List[str]]] = Field(default=None) # Parallelism tp: Optional[Union[int, List[int]]] = Field(default=None) pp: Optional[Union[int, List[int]]] = Field(default=None) cp: Optional[Union[int, List[int]]] = Field(default=None) vp: Optional[Union[int, List[int]]] = Field(default=None) ep: Optional[Union[int, List[int]]] = Field(default=None) et: Optional[Union[int, List[int]]] = Field(default=None) # Batch sizes mb: Optional[Union[int, List[int]]] = Field(default=None) gb: Optional[Union[int, List[int]]] = Field(default=None) # Perf/tuning moe_a2a_overlap: Optional[Union[bool, List[bool]]] = Field(default=None) max_steps: Optional[int] = Field(default=50) recompute_num_layers: Optional[Union[int, List[int]]] = Field(default=None) activation_offload_layers: Optional[Union[int, List[int]]] = Field(default=None) recompute_modules: Optional[Union[str, List[str]]] = Field(default=None) # Optional distributed optimizer instances (for constraints/divisor) num_distributed_optimizer_instances: Optional[int] = Field(default=None)
[docs] @field_validator("hf_token", mode="after") @classmethod def validate_hf_token(cls, v: Optional[str]) -> Optional[str]: token = (v or "").strip() if not token: raise ValueError("cmd_args.hf_token is required. Please set it to your literal HF token string.") return token
[docs] @field_validator("model_name", "model_size", mode="after") @classmethod def validate_model_fields(cls, v: str, info: ValidationInfo) -> str: s = v.strip() if not s: raise ValueError(f"cmd_args.{info.field_name} cannot be empty.") return s
[docs] class MegatronBridgeTestDefinition(TestDefinition): """Megatron-Bridge test definition (CloudAI-managed install + Slurm submission via launcher).""" cmd_args: MegatronBridgeCmdArgs nemo_run_repo: GitRepo = GitRepo( url="https://github.com/NVIDIA-NeMo/Run.git", commit="main", ) _docker_image: Optional[DockerImage] = None _python_executable: Optional[PythonExecutable] = None _megatron_bridge_repo: Optional[GitRepo] = None @staticmethod def _select_megatron_bridge_repo(git_repos: list[GitRepo]) -> GitRepo | None: """Return the Megatron-Bridge repo from `git_repos` (normalized to mount_as=/opt/Megatron-Bridge).""" for repo in git_repos: if "Megatron-Bridge" in repo.url or (repo.mount_as or "").rstrip("/") == "/opt/Megatron-Bridge": return repo if repo.mount_as else repo.model_copy(update={"mount_as": "/opt/Megatron-Bridge"}) return None
[docs] @field_validator("git_repos", mode="after") @classmethod def validate_git_repos_has_megatron_bridge_repo(cls, v: list[GitRepo]) -> list[GitRepo]: """MegatronBridge requires users to pin the Megatron-Bridge repo version via `[[git_repos]]`.""" if not v: raise ValueError( "MegatronBridge requires the user to pin the Megatron-Bridge repository via `[[git_repos]]` " "in the test TOML (provide at least url and commit)." ) if cls._select_megatron_bridge_repo(v) is None: raise ValueError( "MegatronBridge requires `[[git_repos]]` to include the Megatron-Bridge repo (url containing " "'Megatron-Bridge' or mount_as='/opt/Megatron-Bridge')." ) return v

@property def docker_image(self) -> DockerImage: if not self._docker_image: self._docker_image = DockerImage(url=self.cmd_args.container_image or "") return self._docker_image @property def python_executable(self) -> PythonExecutable: if not self._python_executable: self._python_executable = PythonExecutable(git_repo=self.nemo_run_repo) return self._python_executable @property def megatron_bridge_repo(self) -> GitRepo: if self._megatron_bridge_repo is None: selected = self._select_megatron_bridge_repo(self.git_repos) if selected is None: raise ValueError( "MegatronBridge requires the user to pin the Megatron-Bridge repository via `[[git_repos]]` " "in the test TOML (provide at least url and commit)." ) self._megatron_bridge_repo = selected return self._megatron_bridge_repo @property def installables(self) -> list[Installable]: items: list[Installable] = [self.python_executable, self.megatron_bridge_repo] if self.cmd_args.container_image: items.insert(0, self.docker_image) return items

[docs] def constraint_check(self, tr) -> bool: # type: ignore[override] # noqa: C901 num_gpus = cast(int, self.cmd_args.num_gpus) def _as_int(val: Optional[Union[int, List[int]]]) -> Optional[int]: return cast(Optional[int], val) def _as_bool(val: Optional[Union[bool, List[bool]]]) -> bool: return bool(val) if val is not None else False def _normalize_str_list(val: Optional[Union[str, List[str]]]) -> list[str]: if val is None: return [] if isinstance(val, list): items: list[str] = [] for raw in val: s = str(raw).strip().strip("\"'") if s.startswith("[") and s.endswith("]"): s = s[1:-1] for seg in s.split(","): seg = seg.strip().strip("\"'").lower() if seg: items.append(seg) return items s = str(val).strip().strip("\"'") if s.startswith("[") and s.endswith("]"): s = s[1:-1] items = [seg.strip().strip("\"'").lower() for seg in s.split(",") if seg.strip()] return items tp = _as_int(self.cmd_args.tp) or 1 pp = _as_int(self.cmd_args.pp) or 1 cp = _as_int(self.cmd_args.cp) or 1 vp = _as_int(self.cmd_args.vp) mbs = _as_int(self.cmd_args.mb) gbs = _as_int(self.cmd_args.gb) etp = _as_int(self.cmd_args.et) epv = _as_int(self.cmd_args.ep) denom = tp * pp * cp dp = num_gpus // denom if denom > 0 else 0 # Constraint 1: TP/PP/CP divisibility for DP computation if denom == 0: constraint1 = False logging.error("Constraint 1 failed: tp*pp*cp must be > 0. tp=%s pp=%s cp=%s", tp, pp, cp) else: constraint1 = (num_gpus % denom) == 0 if not constraint1: logging.error( "Constraint 1 failed: num_gpus %% (tp*pp*cp) != 0. num_gpus=%s tp=%s pp=%s cp=%s", num_gpus, tp, pp, cp, ) # Constraint 2: VP validity vs layers (relaxed: num_layers not available in this workload API) constraint2 = True # simple variant doesn't carry num_layers; assume satisfied # Constraint 3: DP must be non-zero constraint3 = dp != 0 if not constraint3: logging.error("Constraint 3 failed: dp == 0. dp=%s num_gpus=%s tp=%s pp=%s cp=%s", dp, num_gpus, tp, pp, cp) # Constraint 4: GBS must be divisible by MBS*DP (and *NDO when FSDP) fsdp = _as_bool(self.cmd_args.use_megatron_fsdp) ndo = _as_int(self.cmd_args.num_distributed_optimizer_instances) or 1 extra = ndo if fsdp else 1 if mbs and gbs: divisor = (mbs * dp * extra) if dp != 0 else 0 constraint4 = (gbs % divisor == 0) if divisor != 0 else False if not constraint4: logging.error( "Constraint 4 failed: gbs %% (mbs * dp%s) != 0. gbs=%s mbs=%s dp=%s ndo=%s fsdp=%s", " * ndo" if fsdp else "", gbs, mbs, dp, ndo, fsdp, ) else: constraint4 = True # Determine CUDA graphs enabled from impl/scope (normalized) cgi = (self.cmd_args.cuda_graph_impl or "").strip().lower() scopes = _normalize_str_list(self.cmd_args.cuda_graph_scope) cuda_graphs = (cgi not in {"", "none", "null"}) and len(scopes) > 0 # Constraint 5: FSDP requires CUDA graphs disabled constraint5 = not (fsdp and cuda_graphs) if not constraint5: logging.error( "Constraint 5 failed: use_megatron_fsdp=true requires cuda_graphs=false. fsdp=%s cuda_graphs=%s", fsdp, cuda_graphs, ) # Constraint 6: FSDP requires PP=1, CP=1, VP=1 (when VP set) constraint6 = pp == 1 and cp == 1 and (vp == 1 if vp is not None else True) if fsdp else True if not constraint6: logging.error( "Constraint 6 failed: with fsdp=true, require pp==1, cp==1, vp==1. pp=%s cp=%s vp=%s", pp, cp, vp ) # Constraint 7: MoE parallelism divisibility if etp is None or epv is None: constraint7 = True else: moe_denom = etp * epv * pp if moe_denom == 0: constraint7 = False logging.error("Constraint 7 failed: et*ep*pp must be > 0. et=%s ep=%s pp=%s", etp, epv, pp) else: constraint7 = (num_gpus % moe_denom) == 0 if not constraint7: logging.error( "Constraint 7 failed: num_gpus %% (et * ep * pp) != 0. num_gpus=%s et=%s ep=%s pp=%s", num_gpus, etp, epv, pp, ) # Constraint 8: VP allowed set (relaxed: num_layers not available) constraint8 = True # Constraint 9: NDO > 1 only valid for FSDP if ( self.cmd_args.num_distributed_optimizer_instances is not None and self.cmd_args.num_distributed_optimizer_instances > 1 and not fsdp ): constraint9 = False logging.error( "Constraint 9 failed: num_distributed_optimizer_instances > 1 requires use_megatron_fsdp=true. " "ndo=%s fsdp=%s", self.cmd_args.num_distributed_optimizer_instances, fsdp, ) else: constraint9 = True # Constraint 10: PP>1 cannot be combined with CPU offloading cpu_off_layers = _as_int(self.cmd_args.activation_offload_layers) or 0 if pp > 1 and cpu_off_layers > 0: constraint10 = False logging.error( "Constraint 10 failed: pp > 1 cannot be combined with CPU offloading. " "pp=%s activation_offload_layers=%s", pp, cpu_off_layers, ) else: constraint10 = True # Constraint 11: CUDA graphs require a2a overlap disabled a2a_overlap = _as_bool(self.cmd_args.moe_a2a_overlap) constraint11 = not (cuda_graphs and a2a_overlap) if not constraint11: logging.error( "Constraint 11 failed: cuda_graphs=true requires moe_a2a_overlap=false. " "cuda_graphs=%s moe_a2a_overlap=%s", cuda_graphs, a2a_overlap, ) # Constraint 12: CUDA graphs not supported with CPU offloading if cuda_graphs and cpu_off_layers > 0: constraint12 = False logging.error( "Constraint 12 failed: CUDA graphs not supported with CPU offloading. activation_offload_layers=%s", cpu_off_layers, ) else: constraint12 = True # Constraint 13: cuda_graph_impl validation if cgi not in {"", "none", "null", "transformer_engine", "local"}: constraint13 = False logging.error( "Constraint 13 failed: Invalid cuda_graph_impl=%s. Expected one of none, transformer_engine, local.", cgi, ) else: constraint13 = True # Constraint 14: impl/scope compatibility allowed_scopes = {"attn", "mlp", "moe", "moe_router", "moe_preprocess", "mamba"} if cgi == "local": if len(scopes) > 0 and not (len(scopes) == 1 and scopes[0] == "full_iteration"): constraint14 = False logging.error( "Constraint 14 failed: cuda_graph_impl=local only allows cuda_graph_scope=['full_iteration']." ) else: constraint14 = True elif cgi == "transformer_engine": has_full = any(s == "full_iteration" for s in scopes) invalid = [s for s in scopes if s not in allowed_scopes and s != "full_iteration"] if has_full or invalid: constraint14 = False if has_full: logging.error( "Constraint 14 failed: 'full_iteration' not allowed for transformer_engine cuda graphs." ) if invalid: logging.error( "Constraint 14 failed: invalid cuda_graph_scope values for transformer_engine: %s", invalid ) else: constraint14 = True else: constraint14 = True # Constraint 15: scope cannot contain both moe and moe_router if "moe" in scopes and "moe_router" in scopes: constraint15 = False logging.error("Constraint 15 failed: cuda_graph_scope must not contain both 'moe' and 'moe_router'.") else: constraint15 = True # Constraint 16: moe_preprocess requires moe_router if "moe_preprocess" in scopes and "moe_router" not in scopes: constraint16 = False logging.error("Constraint 16 failed: 'moe_preprocess' scope requires 'moe_router' scope.") else: constraint16 = True # Constraint 17: recompute vs CUDA graphs incompatibilities rec_modules = set(_normalize_str_list(self.cmd_args.recompute_modules)) if scopes and rec_modules: bad = False if "attn" in scopes and ({"core_attn", "mla_up_proj"} & rec_modules): bad = True logging.error( "Constraint 17 failed: attn cuda graph is not supported with core_attn or mla_up_proj recompute. " "recompute_modules=%s", sorted(rec_modules), ) if "mlp" in scopes and "mlp" in rec_modules: bad = True logging.error("Constraint 17 failed: mlp cuda graph is not supported with mlp recompute.") if "moe" in scopes and ({"moe_act", "moe", "shared_experts"} & rec_modules): bad = True logging.error( "Constraint 17 failed: moe cuda graph is not supported with moe_act/moe/shared_experts recompute." ) if "moe_router" in scopes and ({"moe", "shared_experts"} & rec_modules): bad = True logging.error( "Constraint 17 failed: moe_router cuda graph is not supported with moe/shared_experts recompute." ) if "layernorm" in rec_modules and ( "attn" in scopes and "mlp" in scopes and ("moe" in scopes or "moe_router" in scopes) ): bad = True logging.error( "Constraint 17 failed: cuda graph is not supported with layernorm recompute across " "attn+mlp+(moe|moe_router)." ) constraint17 = not bad else: constraint17 = True return bool( constraint1 and constraint2 and constraint3 and constraint4 and constraint5 and constraint6 and constraint7 and constraint8 and constraint9 and constraint10 and constraint11 and constraint12 and constraint13 and constraint14 and constraint15 and constraint16 and constraint17 )
© Copyright 2026, NVIDIA CORPORATION & AFFILIATES. Last updated on Mar 3, 2026