CloudAI Benchmark Framework v1.6.1

networking/display/cloudai161/_modules/cloudai/workloads/ai_dynamo/ai_dynamo.html

Source code for cloudai.workloads.ai_dynamo.ai_dynamo

# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from pathlib import Path
from typing import Literal, Optional, cast

from pydantic import (
    AliasChoices,
    BaseModel,
    ConfigDict,
    Field,
    field_validator,
    model_validator,
)

from cloudai.core import (
    DockerImage,
    File,
    GitRepo,
    HFModel,
    Installable,
    JobStatusResult,
    System,
    TestRun,
)
from cloudai.models.workload import CmdArgs, TestDefinition
from cloudai.systems.slurm import SlurmSystem


class Args(BaseModel):
    """Arguments for custom workloads."""

    model_config = ConfigDict(extra="allow", populate_by_name=True)


class Workload(BaseModel):
    """Workload configuration."""

    model_config = ConfigDict(extra="forbid", populate_by_name=True)

    name: str
    cmd: str
    script: File
    report_name: str = Field(
        default_factory=lambda self: f"{self['name']}_report.csv",
        serialization_alias="report-name",
        validation_alias=AliasChoices("report-name", "report_name"),
    )
    repo: Optional[GitRepo] = None
    args: Args = Field(default_factory=Args)
    extra_args: str | list[str] | None = Field(
        default=None,
        serialization_alias="extra-args",
        validation_alias=AliasChoices("extra-args", "extra_args"),
    )


class WorkerBaseArgs(Args):
    """Base arguments for VLLM workers."""

    model_config = ConfigDict(extra="allow", populate_by_name=True)

    # Used by VLLM backend.
    model: str | None = None

    # Used by SGLang/SGLang-DSR1 backends.
    model_path: str | None = Field(default=None, serialization_alias="model-path")
    served_model_name: str | None = Field(default=None, serialization_alias="served-model-name")

    gpu_memory_utilization: float | list[float] | None = Field(
        default=None,
        serialization_alias="gpu-memory-utilization",
        validation_alias=AliasChoices("gpu-memory-utilization", "gpu_memory_utilization"),
    )
    pipeline_parallel_size: int | list[int] = Field(
        default=1,
        serialization_alias="pipeline-parallel-size",
        validation_alias=AliasChoices("pipeline-parallel-size", "pipeline_parallel_size"),
    )
    tensor_parallel_size: int | list[int] = Field(
        default=1,
        serialization_alias="tensor-parallel-size",
        validation_alias=AliasChoices("tensor-parallel-size", "tensor_parallel_size"),
    )
    data_parallel_size: int | list[int] | None = Field(
        default=None,
        serialization_alias="data-parallel-size",
        validation_alias=AliasChoices("data-parallel-size", "data_parallel_size"),
    )


class WorkerConfig(BaseModel):
    """Configuration for workers."""

    model_config = ConfigDict(extra="forbid", populate_by_name=True)

    cmd: str
    worker_initialized_regex: str = Field(
        validation_alias=AliasChoices("worker-initialized-regex", "worker_initialized_regex"),
        serialization_alias="worker-initialized-regex",
    )
    multiple_workers_per_node: bool = Field(
        default=False,
        validation_alias=AliasChoices("multiple-workers-per-node", "multiple_workers_per_node"),
        serialization_alias="multiple-workers-per-node",
    )

    num_nodes: int | list[int] = Field(
        default=1, serialization_alias="num-nodes", validation_alias=AliasChoices("num-nodes", "num_nodes")
    )
    nodes: str | None = Field(default=None)

    args: WorkerBaseArgs = Field(default_factory=WorkerBaseArgs)

    extra_args: str | list[str] | None = Field(
        default=None,
        serialization_alias="extra-args",
        validation_alias=AliasChoices("extra-args", "extra_args"),
    )


class AIDynamoArgs(BaseModel):
    """Arguments for AI Dynamo setup."""

    model_config = ConfigDict(extra="forbid", populate_by_name=True)

    model: str = "Qwen/Qwen3-0.6B"
    backend: Literal["vllm", "sglang", "sglang_dsr1"] = "vllm"
    endpoint: str = Field(default="v1/chat/completions")
    connector: Optional[str | list[str]] = None

    @field_validator("connector", mode="before")
    @classmethod
    def validate_connector(cls, v: str | list[str] | None) -> str | list[str] | None:
        if v is None:
            return v
        allowed_connectors = ["kvbm", "lmcache", "nixl", "none"]

        # Connectors can be either a single string or a space-separated list.
        values = v if isinstance(v, str) else " ".join(v)
        values = [c.strip() for c in values.split(" ")]

        for connector in values:
            if connector not in allowed_connectors:
                raise ValueError(f"Invalid connector: {connector}. Available connectors: {allowed_connectors}")
        return v

    workspace_path: str = Field(
        default="/workspace",
        serialization_alias="workspace-path",
        validation_alias=AliasChoices("workspace-path", "workspace_path"),
    )
    ingress_cmd: str = Field(
        default="python -m dynamo.frontend --router-mode kv",
        serialization_alias="ingress-cmd",
        validation_alias=AliasChoices("ingress-cmd", "ingress_cmd"),
    )
    node_setup_cmd: str = Field(
        default="/usr/local/ucx/bin/ucx_info -d |grep Transport | sort -u;",
        serialization_alias="node-setup-cmd",
        validation_alias=AliasChoices("node-setup-cmd", "node_setup_cmd"),
    )
    port: int = Field(
        default=8000,
        description="Dynamo frontend HTTP API port",
    )
    etcd_cmd: str = Field(
        default="etcd --log-level info --data-dir /tmp/etcd",
        serialization_alias="etcd-cmd",
        validation_alias=AliasChoices("etcd-cmd", "etcd_cmd"),
    )
    etcd_port: int = Field(
        default=2379,
        serialization_alias="etcd-port",
        validation_alias=AliasChoices("etcd-port", "etcd_port"),
    )
    nats_cmd: str = Field(
        default="nats-server -js",
        serialization_alias="nats-cmd",
        validation_alias=AliasChoices("nats-cmd", "nats_cmd"),
    )
    nats_port: int = Field(
        default=4222,
        serialization_alias="nats-port",
        validation_alias=AliasChoices("nats-port", "nats_port"),
    )

    decode_worker: WorkerConfig = WorkerConfig(
        cmd="python3 -m dynamo.vllm",
        worker_initialized_regex="VllmWorker.*has.been.initialized",
    )
    prefill_worker: WorkerConfig = WorkerConfig(
        cmd="python3 -m dynamo.vllm --is-prefill-worker",
        worker_initialized_regex="VllmWorker.*has.been.initialized",
    )

    @model_validator(mode="after")
    def populate_prefill_decode_args(self) -> "AIDynamoArgs":
        """Populate prefill/decode args."""
        if self.backend.lower() == "vllm":
            self.prefill_worker.args.model = self.model
            self.decode_worker.args.model = self.model
        elif self.backend.lower() in ["sglang", "sglang_dsr1"]:
            self.prefill_worker.args.model_path = self.model
            self.decode_worker.args.model_path = self.model
            self.prefill_worker.args.served_model_name = self.model
            self.decode_worker.args.served_model_name = self.model
        else:
            raise ValueError(f"Invalid backend: {self.backend}")

        return self


class LMCacheArgs(BaseModel):
    """Arguments for LMCache."""

    model_config = ConfigDict(extra="allow")

    chunk_size: int = 256
    local_cpu: bool = False
    nixl_buffer_size: int = 10737418240
    nixl_buffer_device: str = "cuda"
    extra_config_enable_nixl_storage: bool = True
    extra_config_nixl_backend: str = "GDS_MT"
    extra_config_nixl_file_pool_size: int = 64

    # LMCache controller configuration
    enable_controller: bool = True
    lmcache_instance_id: str = "lmcache_default_instance"
    controller_url: str = "localhost:9001"
    lmcache_worker_port: int = 8788
    distributed_url: str = "localhost:8789"


class LMCache(BaseModel):
    """LMCache configuration."""

    model_config = ConfigDict(extra="forbid")

    controller_cmd: str = "lmcache_controller --host localhost --port 9000 --monitor-port 9001"
    repo: GitRepo = GitRepo(
        url="https://github.com/LMCache/LMCache.git", commit="ab8530993992db873869ba882320953582d94309"
    )

    args: LMCacheArgs = Field(default_factory=LMCacheArgs)
    extra_args: str | list[str] | None = Field(
        default=None,
        serialization_alias="extra-args",
        validation_alias=AliasChoices("extra-args", "extra_args"),
    )

    @property
    def installables(self) -> list[Installable]:
        return [self.repo]


class GenAIPerf(Workload):
    """Workload configuration for GenAI performance profiling."""

    model_config = ConfigDict(extra="allow")

    name: str = "genai_perf"
    cmd: str = "genai-perf profile"
    script: File = File(Path(__file__).parent.parent / "ai_dynamo/genai_perf.sh")

    @property
    def installables(self) -> list[Installable]:
        return [self.script]


class Constraints(BaseModel):
    """Constraints for validation of AI Dynamo configurations when using DSE."""

    model_config = ConfigDict(extra="forbid")

    prefill_tp_le_decode_tp: bool = True
    tp_times_pp_le_gpus_per_node: bool = True


[docs] class AIDynamoCmdArgs(CmdArgs): """Arguments for AI Dynamo.""" model_config = ConfigDict(extra="forbid") docker_image_url: str storage_cache_dir: Optional[str | list[str]] = Field(default="/tmp", serialization_alias="storage_cache_dir") dynamo: AIDynamoArgs lmcache: LMCache = Field(default_factory=LMCache) genai_perf: GenAIPerf = Field(default_factory=GenAIPerf) workloads: str = "genai_perf.sh" @field_validator("workloads", mode="before") @classmethod def validate_workloads(cls, v: str) -> str: allowed_workloads = ["genai_perf.sh"] values = [w.strip() for w in v.split(",")] for workload in values: if workload not in allowed_workloads: raise ValueError(f"Invalid workload: {workload}. Available workloads: {allowed_workloads}") return ",".join(values) @property def workloads_list(self) -> list[str]: return [w.strip() for w in self.workloads.split(",")] @property def installables(self) -> list[Installable]: return [ *self.lmcache.installables, *self.genai_perf.installables, ]
[docs] class AIDynamoTestDefinition(TestDefinition): """Test definition for AI Dynamo.""" model_config = ConfigDict(extra="forbid") cmd_args: AIDynamoCmdArgs _docker_image: Optional[DockerImage] = None script: File = File(Path(__file__).parent.parent / "ai_dynamo/ai_dynamo.sh") repo: GitRepo = GitRepo( url="https://github.com/ai-dynamo/dynamo.git", commit="f7e468c7e8ff0d1426db987564e60572167e8464" ) _hf_model: HFModel | None = None constraints: Constraints = Constraints() success_marker: str = "success-marker.txt" failure_marker: str = "failure-marker.txt"
[docs] @model_validator(mode="after") def workload_scripts(self) -> "AIDynamoTestDefinition": """Populate prefill/decode args.""" workload_map = self.get_workload_map() for workload in self.cmd_args.workloads_list: if workload not in workload_map: raise ValueError(f"Invalid workload: {workload}. Available workloads: {list(workload_map.keys())}") return self
[docs] def get_workload_map(self) -> dict[str, Workload]: """Get a map of workload scripts to workload objects.""" return { self.cmd_args.genai_perf.script.src.name: self.cmd_args.genai_perf, }

@property def docker_image(self) -> DockerImage: if not self._docker_image: self._docker_image = DockerImage(url=self.cmd_args.docker_image_url) return self._docker_image @property def hf_model(self) -> HFModel: if not self._hf_model: logging.info(f"Creating HFModel for: {self.cmd_args.dynamo.model}") self._hf_model = HFModel(model_name=self.cmd_args.dynamo.model) return self._hf_model @property def installables(self) -> list[Installable]: """Get all installables for this test definition.""" return [ self.docker_image, self.repo, self.script, self.hf_model, *self.cmd_args.installables, ] def was_run_successful(self, tr: TestRun) -> JobStatusResult: output_path = tr.output_path result = True workload_map = self.get_workload_map() failure_marker = output_path / self.failure_marker success_marker = output_path / self.success_marker if failure_marker.exists(): contents = failure_marker.read_text() return JobStatusResult(False, error_message=f"Failure marker found:\n{contents}") if not success_marker.exists(): return JobStatusResult(False, error_message=f"Success marker file not found: {success_marker.absolute()}") for workload in self.cmd_args.workloads_list: if workload not in workload_map: logging.info(f"Workload {workload} not found in workload map") result = False continue report_name = workload_map[workload].report_name if report_name is None: logging.warning(f"Workload {workload} has no report_name configured") result = False continue workload_csv_file = output_path / report_name if not workload_csv_file.exists(): logging.info(f"Result file ({workload_csv_file.absolute()}) not found for workload: {workload}") result = False else: logging.info(f"Result file ({workload_csv_file.absolute()}) exists for {workload}") return JobStatusResult(result) def constraint_check(self, tr: TestRun, system: Optional[System]) -> bool: prefill_worker = tr.test.cmd_args.dynamo.prefill_worker decode_worker = tr.test.cmd_args.dynamo.decode_worker prefill_tp = prefill_worker.args.tensor_parallel_size prefill_pp = prefill_worker.args.pipeline_parallel_size decode_tp = decode_worker.args.tensor_parallel_size decode_pp = decode_worker.args.pipeline_parallel_size if self.constraints.prefill_tp_le_decode_tp and prefill_tp > decode_tp: logging.info("constraint_check failed for: prefill_tp_le_decode_tp") return False logging.info("constraint_check passed for: prefill_tp_le_decode_tp") gpus_per_node = 0 slurm_system = cast(SlurmSystem, system) if slurm_system and slurm_system.gpus_per_node: gpus_per_node = slurm_system.gpus_per_node if ( gpus_per_node > 0 and self.constraints.tp_times_pp_le_gpus_per_node and (prefill_tp * prefill_pp > gpus_per_node or decode_tp * decode_pp > gpus_per_node) ): logging.info("constraint_check failed for: tp_times_pp_le_gpus_per_node") return False logging.info("constraint_check passed for: tp_times_pp_le_gpus_per_node") return True

© Copyright 2026, NVIDIA CORPORATION & AFFILIATES. Last updated on Jun 3, 2026