CloudAI Benchmark Framework v1.6.1

networking/display/cloudai161/_modules/cloudai/systems/kubernetes/kubernetes_system.html

Source code for cloudai.systems.kubernetes.kubernetes_system

# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
import logging
import subprocess
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast

if TYPE_CHECKING:
    import kubernetes as k8s

from cloudai.core import BaseJob, System
from cloudai.util.lazy_imports import lazy

from .kubernetes_job import KubernetesJob


[docs] class KubernetesSystem(System): """Represents a Kubernetes system.""" kube_config_path: Path default_namespace: str scheduler: str = "kubernetes" monitor_interval: int = 1 gpus_per_node: int = 1 use_host_network: bool | None = None _core_v1: Optional[k8s.client.CoreV1Api] = None _batch_v1: Optional[k8s.client.BatchV1Api] = None _custom_objects_api: Optional[k8s.client.CustomObjectsApi] = None _genai_perf_completed: bool = False def __getstate__(self) -> dict[str, Any]: """Return the state for pickling, excluding non-picklable Kubernetes client objects.""" state = self.model_dump(exclude={"_core_v1", "_batch_v1", "_custom_objects_api"}) return state def __deepcopy__(self, _memo: dict[int, Any] | None = None) -> "KubernetesSystem": """ Create a deep copy of the KubernetesSystem instance. Args: _memo: Dictionary to keep track of objects that have already been copied. Returns: A new KubernetesSystem instance with reinitialized Kubernetes clients. """ state = self.__getstate__() new_instance = KubernetesSystem(**state) new_instance.model_post_init(None) return new_instance def model_post_init(self, _context: Any = None) -> None: """Initialize the KubernetesSystem instance.""" kube_config_path = self.kube_config_path if not kube_config_path.is_file(): home_directory = Path.home() kube_config_path = home_directory / ".kube" / "config" else: kube_config_path = kube_config_path.resolve() if not kube_config_path.exists(): error_message = ( f"Kube config file '{kube_config_path}' not found. This file is required to configure the " f"Kubernetes environment. Please verify that the file exists at the specified path." ) logging.error(error_message) raise FileNotFoundError(error_message) # Instantiate Kubernetes APIs logging.debug(f"Loading kube config from: {kube_config_path}") lazy.k8s.config.load_kube_config(config_file=str(kube_config_path)) self._core_v1 = lazy.k8s.client.CoreV1Api() self._batch_v1 = lazy.k8s.client.BatchV1Api() self._custom_objects_api = lazy.k8s.client.CustomObjectsApi() logging.debug(f"{self.__class__.__name__} initialized") @property def core_v1(self) -> k8s.client.CoreV1Api: """Returns the Kubernetes Core V1 API client.""" assert self._core_v1 is not None return self._core_v1 @property def batch_v1(self) -> k8s.client.BatchV1Api: """Returns the Kubernetes Batch V1 API client.""" assert self._batch_v1 is not None return self._batch_v1 @property def custom_objects_api(self) -> k8s.client.CustomObjectsApi: """Returns the Kubernetes Custom Objects API client.""" assert self._custom_objects_api is not None return self._custom_objects_api def __repr__(self) -> str: """ Provide a structured string representation of the system. Returns str: A string that contains the system name, scheduler type, kube config path, namespace, and image. """ return ( f"System Name: {self.name}\n" f"Scheduler Type: {self.scheduler}\n" f"Kube Config Path: {self.kube_config_path}\n" f"Default Namespace: {self.default_namespace}" ) def update(self) -> None: """ Update the system object for a Kubernetes system. Currently not implemented for KubernetesSystem. """ pass def get_network_attachment_definitions(self) -> list[str]: """Return all NetworkAttachmentDefinitions in the cluster as 'namespace/name' strings.""" cmd = ["kubectl", "get", "network-attachment-definitions", "--all-namespaces", "-o", "json"] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: logging.debug("Failed to list NetworkAttachmentDefinitions: %s", e.stderr) return [] try: data = json.loads(result.stdout) except json.JSONDecodeError as e: logging.debug("Failed to parse NetworkAttachmentDefinitions output: %s", e) return [] return [f"{item['metadata']['namespace']}/{item['metadata']['name']}" for item in data.get("items", [])] def resolve_cni_networks(self) -> list[str] | None: """ Determine which CNI networks to use, or None if host networking should be used. - use_host_network=True: always use hostNetwork, skip CNI discovery. - use_host_network=False: require CNI; raise if none are found. - use_host_network=None: try CNI, fall back to hostNetwork if none found. """ if self.use_host_network is True: return None if networks := self.get_network_attachment_definitions(): return networks if self.use_host_network is False: raise RuntimeError( "use_host_network=False but no NetworkAttachmentDefinitions were found in the cluster. " "Ensure the CNI operator is installed and net-attach-defs are configured." ) return None # auto mode: fall back to hostNetwork def is_job_running(self, job: BaseJob) -> bool: k_job: KubernetesJob = cast(KubernetesJob, job) return self._is_job_running(k_job) def is_job_completed(self, job: BaseJob) -> bool: k_job: KubernetesJob = cast(KubernetesJob, job) return not self._is_job_running(k_job) def _is_job_running(self, job: KubernetesJob) -> bool: logging.debug(f"Checking for job '{job.name}' of kind '{job.kind}' to determine if it is running.") if "mpijob" in job.kind.lower(): return self._is_mpijob_running(job) elif "job" in job.kind.lower(): return self._is_batch_job_running(job.name) elif "dynamographdeployment" in job.kind.lower(): return self._is_dynamo_graph_deployment_running(job) else: error_message = f"Unsupported job kind: '{job.kind}'." logging.error(error_message) raise ValueError(error_message) def _is_mpijob_running(self, job: KubernetesJob) -> bool: try: mpijob = self.custom_objects_api.get_namespaced_custom_object( group="kubeflow.org", version="v2beta1", namespace=self.default_namespace, plural="mpijobs", name=job.name, ) assert isinstance(mpijob, dict) status: dict = cast(dict, mpijob.get("status", {})) conditions = status.get("conditions", []) logging.debug(f"MPIJob '{job.name}': {conditions=} {status=}") self.store_logs_for_job(job.name, job.test_run.output_path) # Consider an empty conditions list as running if not conditions: return True for condition in conditions: if condition["type"] == "Succeeded" and condition["status"] == "True": return False if condition["type"] == "Failed" and condition["status"] == "True": return False # If the job has been created but is neither succeeded nor failed, it is considered running return any(condition["type"] == "Created" and condition["status"] == "True" for condition in conditions) except lazy.k8s.client.ApiException as e: if e.status == 404: logging.debug(f"MPIJob '{job.name}' not found. It may have completed and been removed from the system.") return False else: error_message = ( f"Error occurred while retrieving status for MPIJob '{job.name}' " f"Error code: {e.status}. Message: {e.reason}. Please check the job name, namespace, and " "Kubernetes API server." ) logging.error(error_message) raise def _is_batch_job_running(self, job_name: str) -> bool: try: k8s_job: Any = self.batch_v1.read_namespaced_job_status(name=job_name, namespace=self.default_namespace) if not (hasattr(k8s_job, "status") and hasattr(k8s_job.status, "conditions")): logging.debug(f"Job '{job_name}' does not have expected status attributes.") return False conditions = k8s_job.status.conditions or [] # Consider an empty conditions list as running if not conditions: return True for condition in conditions: if condition.type == "Complete" and condition.status == "True": return False if condition.type == "Failed" and condition.status == "True": return False return any(condition.type == "Created" and condition.status == "True" for condition in conditions) except lazy.k8s.client.ApiException as e: if e.status == 404: logging.debug( f"Batch job '{job_name}' not found.It may have completed and been removed from the system." ) return False else: logging.error( f"Error occurred while retrieving status for batch job '{job_name}'." f"Error code: {e.status}. Message: {e.reason}. Please check the job name and Kubernetes API server." ) raise def are_vllm_pods_ready(self, job: KubernetesJob) -> bool: cmd = ["kubectl", "get", "pods", "-n", self.default_namespace] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: logging.error(f"Failed to get pods: {e}") return False all_ready = True vllm_pods_found = False for line in result.stdout.splitlines(): if line.startswith("NAME"): continue columns = line.split() if len(columns) < 3: continue pod_name = columns[0] if job.name not in pod_name: continue vllm_pods_found = True ready_status = columns[1] pod_status = columns[2] if pod_status == "Terminating": logging.debug(f"Pod {pod_name} is terminating") return False try: ready_count, total_count = map(int, ready_status.split("/")) except (ValueError, IndexError) as e: logging.error(f"Failed to parse ready status '{ready_status}' for pod {pod_name}: {e}") return False if pod_status == "Running" and ready_count == total_count: logging.debug(f"Pod {pod_name} is running and ready ({ready_status})") else: logging.debug(f"Pod {pod_name} is {pod_status} but not fully ready ({ready_status})") all_ready = False if not vllm_pods_found: logging.debug("No vLLM pods found") return False return all_ready def _get_dynamo_pod_by_role(self, role: str) -> str: for pod in self.core_v1.list_namespaced_pod(namespace=self.default_namespace).items: labels = pod.metadata.labels logging.debug(f"Found pod: {pod.metadata.name} with labels: {labels}") if labels and str(labels.get("nvidia.com/dynamo-component", "")).lower() == role.lower(): # v0.6.x return pod.metadata.name if labels and str(labels.get("nvidia.com/dynamo-component-type", "")).lower() == role.lower(): # v0.7.x return pod.metadata.name raise RuntimeError(f"No pod found for the role '{role}'") def _run_genai_perf(self, job: KubernetesJob) -> None: from cloudai.workloads.ai_dynamo.ai_dynamo import AIDynamoTestDefinition if not isinstance(job.test_run.test, AIDynamoTestDefinition): raise TypeError("Test definition must be an instance of AIDynamoTestDefinition") tdef = cast(AIDynamoTestDefinition, job.test_run.test) genai_perf_results_path = "/tmp/cloudai/genai-perf" frontend_pod = self._get_dynamo_pod_by_role(role="frontend") wrapper_script_path = tdef.cmd_args.genai_perf.script.installed_path pod_wrapper_path = "/tmp/genai_perf.sh" logging.debug(f"Copying wrapper script {wrapper_script_path} to pod {frontend_pod}") cp_wrapper_cmd = [ "kubectl", "cp", str(wrapper_script_path), f"{self.default_namespace}/{frontend_pod}:{pod_wrapper_path}", ] subprocess.run(cp_wrapper_cmd, capture_output=True, text=True, check=True) chmod_cmd = ["chmod", "+x", pod_wrapper_path] kubectl_exec_cmd = ["kubectl", "exec", "-n", self.default_namespace, frontend_pod, "--", *chmod_cmd] logging.debug(f"Making wrapper script executable in pod={frontend_pod}") try: result = subprocess.run(kubectl_exec_cmd, capture_output=True, text=True, timeout=60 * 10, check=True) logging.debug(f"chmod exited {result.returncode}: {result.stdout} {result.stderr}") except Exception as e: logging.debug(f"Error making wrapper script executable in pod '{frontend_pod}': {e}") genai_perf_config: list[str] = [ "--cmd", tdef.cmd_args.genai_perf.cmd, "--report-name", tdef.cmd_args.genai_perf.report_name, ] extra_args = tdef.cmd_args.genai_perf.extra_args if isinstance(extra_args, list): extra_args = " ".join(extra_args) if extra_args: genai_perf_config.extend(["--extra-args", extra_args]) # Build genai-perf arguments as --key value pairs for parse_genai_perf_args genai_perf_cmd_parts: list[str] = [] if tdef.cmd_args.genai_perf.args: for k, v in tdef.cmd_args.genai_perf.args.model_dump(exclude_none=True).items(): genai_perf_cmd_parts.extend([f"--{k}", str(v)]) wrapper_cmd = [ "/bin/bash", pod_wrapper_path, "--result-dir", genai_perf_results_path, "--gpus-per-node", str(self.gpus_per_node), "--model", tdef.cmd_args.dynamo.model, "--url", "http://localhost", "--port", str(tdef.cmd_args.dynamo.port), "--endpoint", tdef.cmd_args.dynamo.endpoint, *genai_perf_config, "--", *genai_perf_cmd_parts, ] kubectl_exec_cmd = ["kubectl", "exec", "-n", self.default_namespace, frontend_pod, "--", *wrapper_cmd] logging.debug(f"Executing genai-perf in pod={frontend_pod} cmd={kubectl_exec_cmd}") try: result = subprocess.run(kubectl_exec_cmd, capture_output=True, text=True, timeout=60 * 10) logging.debug(f"genai-perf exited with code {result.returncode}") with (job.test_run.output_path / "genai_perf.log").open("w") as f: f.write(result.stdout) if result.stderr: f.write("\nSTDERR:\n") f.write(result.stderr) except Exception as e: logging.debug(f"Error executing genai-perf command in pod '{frontend_pod}': {e}") self._copy_genai_perf_results(job, frontend_pod, genai_perf_results_path) def _copy_genai_perf_results(self, job: KubernetesJob, frontend_pod: str, genai_perf_results_path: str) -> None: from cloudai.workloads.ai_dynamo.ai_dynamo import AIDynamoTestDefinition tdef = cast(AIDynamoTestDefinition, job.test_run.test) assert isinstance(tdef, AIDynamoTestDefinition) cmd = [ "kubectl", "cp", f"{self.default_namespace}/{frontend_pod}:{genai_perf_results_path}", str(job.test_run.output_path), ] logging.debug(f"Copying results with command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logging.error(f"Error copying results with command: {' '.join(cmd)}: {result.stderr}") return report_path = job.test_run.output_path / tdef.cmd_args.genai_perf.report_name if not report_path.exists(): logging.error(f"Genai-perf report not found at {report_path}") return (job.test_run.output_path / tdef.success_marker).touch() logging.debug(f"Success marker touched at {job.test_run.output_path / tdef.success_marker}") def _check_deployment_conditions(self, conditions: list) -> bool: logging.debug(f"Checking deployment conditions: {conditions}") if not conditions: return True for condition in conditions: if condition["type"] == "Ready" and condition["status"] == "True": return True if condition["type"] == "Failed" and condition["status"] == "True": return False return True def _is_dynamo_graph_deployment_running(self, job: KubernetesJob) -> bool: if self._genai_perf_completed: return False if self.are_vllm_pods_ready(job): self._run_genai_perf(job) self._genai_perf_completed = True for pod_role in {"decode", "prefill", "frontend"}: try: pod_name = self._get_dynamo_pod_by_role(pod_role) logging.debug(f"Fetching logs for {pod_role=} {pod_name=}") logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=self.default_namespace) with (job.test_run.output_path / f"{pod_role}_pod.log").open("w") as f: f.write(logs) except Exception as e: logging.debug(f"Error fetching logs for role '{pod_role}': {e}") return False deployment = cast( dict, self.custom_objects_api.get_namespaced_custom_object( group="nvidia.com", version="v1alpha1", namespace=self.default_namespace, plural="dynamographdeployments", name=job.name, ), ) status: dict = cast(dict, deployment.get("status", {})) return self._check_deployment_conditions(status.get("conditions", [])) def kill(self, job: BaseJob) -> None: """ Terminate a Kubernetes job. Args: job (BaseJob): The job to be terminated. """ k_job: KubernetesJob = cast(KubernetesJob, job) self.store_logs_for_job(k_job.name, k_job.test_run.output_path) self.delete_job(k_job.name, k_job.kind) def delete_job(self, job_name: str, job_kind: str) -> None: if "mpijob" in job_kind.lower(): self._delete_mpi_job(job_name) elif "job" in job_kind.lower(): self._delete_batch_job(job_name) elif "dynamographdeployment" in job_kind.lower(): self._delete_dynamo_graph_deployment(job_name) else: error_message = f"Unsupported job kind: '{job_kind}'." logging.error(error_message) raise ValueError(error_message) def _delete_mpi_job(self, job_name: str) -> None: logging.debug(f"Deleting MPIJob '{job_name}'") try: self.custom_objects_api.delete_namespaced_custom_object( group="kubeflow.org", version="v2beta1", namespace=self.default_namespace, plural="mpijobs", name=job_name, body=lazy.k8s.client.V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=5), ) logging.debug(f"MPIJob '{job_name}' deleted successfully") except lazy.k8s.client.ApiException as e: if e.status == 404: logging.debug(f"MPIJob '{job_name}' not found. It may have already been deleted.") else: logging.error( f"An error occurred while attempting to delete MPIJob '{job_name}'. " f"Error code: {e.status}. Message: {e.reason}. " "Please verify the job name and Kubernetes API server." ) raise def _delete_batch_job(self, job_name: str) -> None: logging.debug(f"Deleting batch job '{job_name}'") try: api_response = self.batch_v1.delete_namespaced_job( name=job_name, namespace=self.default_namespace, body=lazy.k8s.client.V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=5), ) except lazy.k8s.client.ApiException as e: if e.status == 404: logging.debug(f"Batch job '{job_name}' not found. It may have already been deleted.") return logging.error( f"An error occurred while attempting to delete batch job '{job_name}'. " f"Error code: {e.status}. Message: {e.reason}. " "Please verify the job name and Kubernetes API server." ) raise api_response = cast("k8s.client.V1Status", api_response) logging.debug(f"Batch job '{job_name}' deleted with status: {api_response.status}") def _delete_dynamo_graph_deployment(self, job_name: str) -> None: logging.debug(f"Deleting DynamoGraphDeployment '{job_name}'") cmd = f"kubectl delete dgd {job_name} -n {self.default_namespace}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode != 0: logging.debug(f"Failed to delete DynamoGraphDeployment: {result.stderr}") self._genai_perf_completed = False def create_job(self, job_spec: Dict[Any, Any], timeout: int = 60, interval: int = 1) -> str: """ Create a job in the Kubernetes system in a blocking manner. Args: job_spec (Dict[Any, Any]): The job specification. timeout (int): The maximum time to wait for the job to be created and observable. interval (int): The time to wait between checks, in seconds. Returns: str: The job name. Raises: ValueError: If the job specification does not contain a valid 'kind' field. TimeoutError: If the job is not observable within the timeout period. """ logging.debug(f"Creating job with spec: {job_spec}") job_name = self._create_job(job_spec) # Wait for the job to be observable by Kubernetes start_time = time.time() while time.time() - start_time < timeout: if self._is_job_observable(job_name, job_spec.get("kind", "")): logging.debug(f"Job '{job_name}' is now observable.") return job_name logging.debug(f"Waiting for job '{job_name}' to become observable...") time.sleep(interval) raise TimeoutError(f"Job '{job_name}' was not observable within {timeout} seconds.") def _create_job(self, job_spec: Dict[Any, Any]) -> str: api_version = job_spec.get("apiVersion", "") kind = job_spec.get("kind", "").lower() if "mpijob" in kind: return self._create_mpi_job(job_spec) elif ("batch" in api_version) and ("job" in kind): return self._create_batch_job(job_spec) elif "dynamographdeployment" in kind: return self._create_dynamo_graph_deployment(job_spec) else: error_message = ( f"Unsupported job kind: '{job_spec.get('kind')}'.\n" "Please review the job specification generation logic to ensure that the 'kind' field is set " "correctly.\n" ) logging.error(error_message) raise ValueError(error_message) def _create_batch_job(self, job_spec: Dict[Any, Any]) -> str: api_response = self.batch_v1.create_namespaced_job(body=job_spec, namespace=self.default_namespace) if not isinstance(api_response, lazy.k8s.client.V1Job) or api_response.metadata is None: raise ValueError("Job creation failed or returned an unexpected type") job_name: str = api_response.metadata.name logging.debug(f"Job '{job_name}' created with status: {api_response.status}") return job_name def _create_mpi_job(self, job_spec: Dict[Any, Any]) -> str: api_response = self.custom_objects_api.create_namespaced_custom_object( group="kubeflow.org", version="v2beta1", namespace=self.default_namespace, plural="mpijobs", body=job_spec, ) job_name: str = api_response["metadata"]["name"] logging.debug(f"MPIJob '{job_name}' created with status: {api_response.get('status')}") return job_name def _create_dynamo_graph_deployment(self, job_spec: Dict[Any, Any]) -> str: logging.debug(f"Attempting to delete existing job='{job_spec['metadata']['name']}' before creation.") self._delete_dynamo_graph_deployment(job_spec["metadata"]["name"]) logging.debug("Creating DynamoGraphDeployment with spec") try: api_response = self.custom_objects_api.create_namespaced_custom_object( group="nvidia.com", version="v1alpha1", namespace=self.default_namespace, plural="dynamographdeployments", body=job_spec, ) except lazy.k8s.client.ApiException as e: logging.error(f"An error occurred while creating DynamoGraphDeployment: {e.reason}") self._delete_dynamo_graph_deployment(job_spec["metadata"]["name"]) raise job_name = str(api_response["metadata"]["name"]) logging.debug(f"DynamoGraphDeployment '{job_name}' created with status: {api_response.get('status')}") return job_name def _is_job_observable(self, job_name: str, job_kind: str) -> bool: logging.debug(f"Checking if job '{job_name}' of kind '{job_kind}' is observable.") if "mpijob" in job_kind.lower(): return self._is_mpijob_observable(job_name) elif "job" in job_kind.lower(): return self._is_batch_job_observable(job_name) elif "dynamographdeployment" in job_kind.lower(): return self._is_dynamo_graph_deployment_observable(job_name) else: logging.error(f"Unsupported job kind: '{job_kind}'") return False def _is_mpijob_observable(self, job_name: str) -> bool: logging.debug(f"Attempting to observe MPIJob '{job_name}'.") try: api_instance = self.custom_objects_api mpijob = api_instance.get_namespaced_custom_object( group="kubeflow.org", version="v2beta1", namespace=self.default_namespace, plural="mpijobs", name=job_name, ) if mpijob: logging.debug(f"MPIJob '{job_name}' found with details: {mpijob}.") return True else: logging.debug(f"MPIJob '{job_name}' is not yet observable.") return False except lazy.k8s.client.ApiException as e: if e.status == 404: logging.debug(f"MPIJob '{job_name}' not found.") return False else: logging.error( f"An error occurred while checking if MPIJob '{job_name}' is observable: {e.reason}. " f"Please check the job name, namespace, and Kubernetes API server." ) raise def _is_batch_job_observable(self, job_name: str) -> bool: logging.debug(f"Attempting to observe batch job '{job_name}'.") try: return self.batch_v1.read_namespaced_job_status(name=job_name, namespace=self.default_namespace) is not None except lazy.k8s.client.ApiException as e: if e.status == 404: logging.debug(f"Batch job '{job_name}' not found.") return False else: logging.error( f"An error occurred while checking if batch job '{job_name}' is observable: {e.reason}. " f"Please check the job name, namespace, and Kubernetes API server." ) raise def _is_dynamo_graph_deployment_observable(self, job_name: str) -> bool: logging.debug(f"Attempting to observe DynamoGraphDeployment '{job_name}'.") try: api_instance = self.custom_objects_api deployment = api_instance.get_namespaced_custom_object( group="nvidia.com", version="v1alpha1", namespace=self.default_namespace, plural="dynamographdeployments", name=job_name, ) if deployment: logging.debug(f"DynamoGraphDeployment '{job_name}' found with details: {deployment}.") return True else: logging.debug(f"DynamoGraphDeployment '{job_name}' is not yet observable.") return False except lazy.k8s.client.ApiException as e: if e.status == 404: logging.debug(f"DynamoGraphDeployment '{job_name}' not found.") return False else: logging.error( f"An error occurred while checking if DynamoGraphDeployment '{job_name}' " f"is observable: {e.reason}. Please check the job name, namespace, and " "Kubernetes API server." ) raise def list_jobs(self) -> List[Any]: """ List all jobs in the Kubernetes system's default namespace. Returns List[Any]: A list of jobs in the namespace. """ logging.debug(f"Listing jobs in namespace '{self.default_namespace}'") return self.batch_v1.list_namespaced_job(namespace=self.default_namespace).items def create_node_group(self, name: str, node_list: List[str]) -> None: """ Create a node group in the Kubernetes system. Args: name (str): The name of the node group. node_list (List[str]): List of node names to be included in the group. """ logging.debug(f"Creating node group '{name}' with nodes: {node_list}") for node in node_list: body = {"metadata": {"labels": {"cloudai/node-group": name}}} logging.debug(f"Labeling node '{node}' with group '{name}'") self.core_v1.patch_node(node, body) def store_logs_for_job(self, job_name: str, output_dir: Path) -> None: """ Retrieve and store logs for all pods associated with a given job. Args: job_name (str): The name of the job. output_dir (Path): The directory where logs will be saved. """ pod_names = self.get_pod_names_for_job(job_name) if not pod_names: logging.debug(f"No pods found for job '{job_name}'") return output_dir.mkdir(parents=True, exist_ok=True) stdout_file_path = output_dir / "stdout.txt" with stdout_file_path.open("w") as stdout_file: for pod_name in pod_names: try: logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=self.default_namespace) log_file_path = output_dir / f"{pod_name}.txt" with log_file_path.open("w") as log_file: log_file.write(logs) logging.debug(f"Logs for pod '{pod_name}' saved to '{log_file_path}'") stdout_file.write(logs + "\n") except lazy.k8s.client.ApiException as e: logging.debug(f"Error retrieving logs for pod '{pod_name}': {e}") logging.debug(f"All logs concatenated and saved to '{stdout_file_path}'") def get_pod_names_for_job(self, job_name: str) -> List[str]: """ Retrieve pod names associated with a given job. Args: job_name (str): The name of the job. Returns: List[str]: A list of pod names associated with the job. """ pod_names = [] try: pods = self.core_v1.list_namespaced_pod(namespace=self.default_namespace) for pod in pods.items: if pod.metadata.labels and pod.metadata.labels.get("training.kubeflow.org/job-name") == job_name: pod_names.append(pod.metadata.name) except lazy.k8s.client.ApiException as e: logging.error(f"Error retrieving pods for job '{job_name}': {e}") return pod_names
© Copyright 2026, NVIDIA CORPORATION & AFFILIATES. Last updated on Jun 3, 2026