# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
import logging
import subprocess
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
if TYPE_CHECKING:
import kubernetes as k8s
from cloudai.core import BaseJob, System
from cloudai.util.lazy_imports import lazy
from .kubernetes_job import KubernetesJob
[docs]
class KubernetesSystem(System):
"""Represents a Kubernetes system."""
kube_config_path: Path
default_namespace: str
scheduler: str = "kubernetes"
monitor_interval: int = 1
gpus_per_node: int = 1
use_host_network: bool | None = None
_core_v1: Optional[k8s.client.CoreV1Api] = None
_batch_v1: Optional[k8s.client.BatchV1Api] = None
_custom_objects_api: Optional[k8s.client.CustomObjectsApi] = None
_genai_perf_completed: bool = False
def __getstate__(self) -> dict[str, Any]:
"""Return the state for pickling, excluding non-picklable Kubernetes client objects."""
state = self.model_dump(exclude={"_core_v1", "_batch_v1", "_custom_objects_api"})
return state
def __deepcopy__(self, _memo: dict[int, Any] | None = None) -> "KubernetesSystem":
"""
Create a deep copy of the KubernetesSystem instance.
Args:
_memo: Dictionary to keep track of objects that have already been copied.
Returns:
A new KubernetesSystem instance with reinitialized Kubernetes clients.
"""
state = self.__getstate__()
new_instance = KubernetesSystem(**state)
new_instance.model_post_init(None)
return new_instance
def model_post_init(self, _context: Any = None) -> None:
"""Initialize the KubernetesSystem instance."""
kube_config_path = self.kube_config_path
if not kube_config_path.is_file():
home_directory = Path.home()
kube_config_path = home_directory / ".kube" / "config"
else:
kube_config_path = kube_config_path.resolve()
if not kube_config_path.exists():
error_message = (
f"Kube config file '{kube_config_path}' not found. This file is required to configure the "
f"Kubernetes environment. Please verify that the file exists at the specified path."
)
logging.error(error_message)
raise FileNotFoundError(error_message)
# Instantiate Kubernetes APIs
logging.debug(f"Loading kube config from: {kube_config_path}")
lazy.k8s.config.load_kube_config(config_file=str(kube_config_path))
self._core_v1 = lazy.k8s.client.CoreV1Api()
self._batch_v1 = lazy.k8s.client.BatchV1Api()
self._custom_objects_api = lazy.k8s.client.CustomObjectsApi()
logging.debug(f"{self.__class__.__name__} initialized")
@property
def core_v1(self) -> k8s.client.CoreV1Api:
"""Returns the Kubernetes Core V1 API client."""
assert self._core_v1 is not None
return self._core_v1
@property
def batch_v1(self) -> k8s.client.BatchV1Api:
"""Returns the Kubernetes Batch V1 API client."""
assert self._batch_v1 is not None
return self._batch_v1
@property
def custom_objects_api(self) -> k8s.client.CustomObjectsApi:
"""Returns the Kubernetes Custom Objects API client."""
assert self._custom_objects_api is not None
return self._custom_objects_api
def __repr__(self) -> str:
"""
Provide a structured string representation of the system.
Returns
str: A string that contains the system name, scheduler type, kube config path, namespace, and image.
"""
return (
f"System Name: {self.name}\n"
f"Scheduler Type: {self.scheduler}\n"
f"Kube Config Path: {self.kube_config_path}\n"
f"Default Namespace: {self.default_namespace}"
)
def update(self) -> None:
"""
Update the system object for a Kubernetes system.
Currently not implemented for KubernetesSystem.
"""
pass
def get_network_attachment_definitions(self) -> list[str]:
"""Return all NetworkAttachmentDefinitions in the cluster as 'namespace/name' strings."""
cmd = ["kubectl", "get", "network-attachment-definitions", "--all-namespaces", "-o", "json"]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
logging.debug("Failed to list NetworkAttachmentDefinitions: %s", e.stderr)
return []
try:
data = json.loads(result.stdout)
except json.JSONDecodeError as e:
logging.debug("Failed to parse NetworkAttachmentDefinitions output: %s", e)
return []
return [f"{item['metadata']['namespace']}/{item['metadata']['name']}" for item in data.get("items", [])]
def resolve_cni_networks(self) -> list[str] | None:
"""
Determine which CNI networks to use, or None if host networking should be used.
- use_host_network=True: always use hostNetwork, skip CNI discovery.
- use_host_network=False: require CNI; raise if none are found.
- use_host_network=None: try CNI, fall back to hostNetwork if none found.
"""
if self.use_host_network is True:
return None
if networks := self.get_network_attachment_definitions():
return networks
if self.use_host_network is False:
raise RuntimeError(
"use_host_network=False but no NetworkAttachmentDefinitions were found in the cluster. "
"Ensure the CNI operator is installed and net-attach-defs are configured."
)
return None # auto mode: fall back to hostNetwork
def is_job_running(self, job: BaseJob) -> bool:
k_job: KubernetesJob = cast(KubernetesJob, job)
return self._is_job_running(k_job)
def is_job_completed(self, job: BaseJob) -> bool:
k_job: KubernetesJob = cast(KubernetesJob, job)
return not self._is_job_running(k_job)
def _is_job_running(self, job: KubernetesJob) -> bool:
logging.debug(f"Checking for job '{job.name}' of kind '{job.kind}' to determine if it is running.")
if "mpijob" in job.kind.lower():
return self._is_mpijob_running(job)
elif "job" in job.kind.lower():
return self._is_batch_job_running(job.name)
elif "dynamographdeployment" in job.kind.lower():
return self._is_dynamo_graph_deployment_running(job)
else:
error_message = f"Unsupported job kind: '{job.kind}'."
logging.error(error_message)
raise ValueError(error_message)
def _is_mpijob_running(self, job: KubernetesJob) -> bool:
try:
mpijob = self.custom_objects_api.get_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace=self.default_namespace,
plural="mpijobs",
name=job.name,
)
assert isinstance(mpijob, dict)
status: dict = cast(dict, mpijob.get("status", {}))
conditions = status.get("conditions", [])
logging.debug(f"MPIJob '{job.name}': {conditions=} {status=}")
self.store_logs_for_job(job.name, job.test_run.output_path)
# Consider an empty conditions list as running
if not conditions:
return True
for condition in conditions:
if condition["type"] == "Succeeded" and condition["status"] == "True":
return False
if condition["type"] == "Failed" and condition["status"] == "True":
return False
# If the job has been created but is neither succeeded nor failed, it is considered running
return any(condition["type"] == "Created" and condition["status"] == "True" for condition in conditions)
except lazy.k8s.client.ApiException as e:
if e.status == 404:
logging.debug(f"MPIJob '{job.name}' not found. It may have completed and been removed from the system.")
return False
else:
error_message = (
f"Error occurred while retrieving status for MPIJob '{job.name}' "
f"Error code: {e.status}. Message: {e.reason}. Please check the job name, namespace, and "
"Kubernetes API server."
)
logging.error(error_message)
raise
def _is_batch_job_running(self, job_name: str) -> bool:
try:
k8s_job: Any = self.batch_v1.read_namespaced_job_status(name=job_name, namespace=self.default_namespace)
if not (hasattr(k8s_job, "status") and hasattr(k8s_job.status, "conditions")):
logging.debug(f"Job '{job_name}' does not have expected status attributes.")
return False
conditions = k8s_job.status.conditions or []
# Consider an empty conditions list as running
if not conditions:
return True
for condition in conditions:
if condition.type == "Complete" and condition.status == "True":
return False
if condition.type == "Failed" and condition.status == "True":
return False
return any(condition.type == "Created" and condition.status == "True" for condition in conditions)
except lazy.k8s.client.ApiException as e:
if e.status == 404:
logging.debug(
f"Batch job '{job_name}' not found.It may have completed and been removed from the system."
)
return False
else:
logging.error(
f"Error occurred while retrieving status for batch job '{job_name}'."
f"Error code: {e.status}. Message: {e.reason}. Please check the job name and Kubernetes API server."
)
raise
def are_vllm_pods_ready(self, job: KubernetesJob) -> bool:
cmd = ["kubectl", "get", "pods", "-n", self.default_namespace]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
logging.error(f"Failed to get pods: {e}")
return False
all_ready = True
vllm_pods_found = False
for line in result.stdout.splitlines():
if line.startswith("NAME"):
continue
columns = line.split()
if len(columns) < 3:
continue
pod_name = columns[0]
if job.name not in pod_name:
continue
vllm_pods_found = True
ready_status = columns[1]
pod_status = columns[2]
if pod_status == "Terminating":
logging.debug(f"Pod {pod_name} is terminating")
return False
try:
ready_count, total_count = map(int, ready_status.split("/"))
except (ValueError, IndexError) as e:
logging.error(f"Failed to parse ready status '{ready_status}' for pod {pod_name}: {e}")
return False
if pod_status == "Running" and ready_count == total_count:
logging.debug(f"Pod {pod_name} is running and ready ({ready_status})")
else:
logging.debug(f"Pod {pod_name} is {pod_status} but not fully ready ({ready_status})")
all_ready = False
if not vllm_pods_found:
logging.debug("No vLLM pods found")
return False
return all_ready
def _get_dynamo_pod_by_role(self, role: str) -> str:
for pod in self.core_v1.list_namespaced_pod(namespace=self.default_namespace).items:
labels = pod.metadata.labels
logging.debug(f"Found pod: {pod.metadata.name} with labels: {labels}")
if labels and str(labels.get("nvidia.com/dynamo-component", "")).lower() == role.lower(): # v0.6.x
return pod.metadata.name
if labels and str(labels.get("nvidia.com/dynamo-component-type", "")).lower() == role.lower(): # v0.7.x
return pod.metadata.name
raise RuntimeError(f"No pod found for the role '{role}'")
def _run_genai_perf(self, job: KubernetesJob) -> None:
from cloudai.workloads.ai_dynamo.ai_dynamo import AIDynamoTestDefinition
if not isinstance(job.test_run.test, AIDynamoTestDefinition):
raise TypeError("Test definition must be an instance of AIDynamoTestDefinition")
tdef = cast(AIDynamoTestDefinition, job.test_run.test)
genai_perf_results_path = "/tmp/cloudai/genai-perf"
frontend_pod = self._get_dynamo_pod_by_role(role="frontend")
wrapper_script_path = tdef.cmd_args.genai_perf.script.installed_path
pod_wrapper_path = "/tmp/genai_perf.sh"
logging.debug(f"Copying wrapper script {wrapper_script_path} to pod {frontend_pod}")
cp_wrapper_cmd = [
"kubectl",
"cp",
str(wrapper_script_path),
f"{self.default_namespace}/{frontend_pod}:{pod_wrapper_path}",
]
subprocess.run(cp_wrapper_cmd, capture_output=True, text=True, check=True)
chmod_cmd = ["chmod", "+x", pod_wrapper_path]
kubectl_exec_cmd = ["kubectl", "exec", "-n", self.default_namespace, frontend_pod, "--", *chmod_cmd]
logging.debug(f"Making wrapper script executable in pod={frontend_pod}")
try:
result = subprocess.run(kubectl_exec_cmd, capture_output=True, text=True, timeout=60 * 10, check=True)
logging.debug(f"chmod exited {result.returncode}: {result.stdout} {result.stderr}")
except Exception as e:
logging.debug(f"Error making wrapper script executable in pod '{frontend_pod}': {e}")
genai_perf_config: list[str] = [
"--cmd",
tdef.cmd_args.genai_perf.cmd,
"--report-name",
tdef.cmd_args.genai_perf.report_name,
]
extra_args = tdef.cmd_args.genai_perf.extra_args
if isinstance(extra_args, list):
extra_args = " ".join(extra_args)
if extra_args:
genai_perf_config.extend(["--extra-args", extra_args])
# Build genai-perf arguments as --key value pairs for parse_genai_perf_args
genai_perf_cmd_parts: list[str] = []
if tdef.cmd_args.genai_perf.args:
for k, v in tdef.cmd_args.genai_perf.args.model_dump(exclude_none=True).items():
genai_perf_cmd_parts.extend([f"--{k}", str(v)])
wrapper_cmd = [
"/bin/bash",
pod_wrapper_path,
"--result-dir",
genai_perf_results_path,
"--gpus-per-node",
str(self.gpus_per_node),
"--model",
tdef.cmd_args.dynamo.model,
"--url",
"http://localhost",
"--port",
str(tdef.cmd_args.dynamo.port),
"--endpoint",
tdef.cmd_args.dynamo.endpoint,
*genai_perf_config,
"--",
*genai_perf_cmd_parts,
]
kubectl_exec_cmd = ["kubectl", "exec", "-n", self.default_namespace, frontend_pod, "--", *wrapper_cmd]
logging.debug(f"Executing genai-perf in pod={frontend_pod} cmd={kubectl_exec_cmd}")
try:
result = subprocess.run(kubectl_exec_cmd, capture_output=True, text=True, timeout=60 * 10)
logging.debug(f"genai-perf exited with code {result.returncode}")
with (job.test_run.output_path / "genai_perf.log").open("w") as f:
f.write(result.stdout)
if result.stderr:
f.write("\nSTDERR:\n")
f.write(result.stderr)
except Exception as e:
logging.debug(f"Error executing genai-perf command in pod '{frontend_pod}': {e}")
self._copy_genai_perf_results(job, frontend_pod, genai_perf_results_path)
def _copy_genai_perf_results(self, job: KubernetesJob, frontend_pod: str, genai_perf_results_path: str) -> None:
from cloudai.workloads.ai_dynamo.ai_dynamo import AIDynamoTestDefinition
tdef = cast(AIDynamoTestDefinition, job.test_run.test)
assert isinstance(tdef, AIDynamoTestDefinition)
cmd = [
"kubectl",
"cp",
f"{self.default_namespace}/{frontend_pod}:{genai_perf_results_path}",
str(job.test_run.output_path),
]
logging.debug(f"Copying results with command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logging.error(f"Error copying results with command: {' '.join(cmd)}: {result.stderr}")
return
report_path = job.test_run.output_path / tdef.cmd_args.genai_perf.report_name
if not report_path.exists():
logging.error(f"Genai-perf report not found at {report_path}")
return
(job.test_run.output_path / tdef.success_marker).touch()
logging.debug(f"Success marker touched at {job.test_run.output_path / tdef.success_marker}")
def _check_deployment_conditions(self, conditions: list) -> bool:
logging.debug(f"Checking deployment conditions: {conditions}")
if not conditions:
return True
for condition in conditions:
if condition["type"] == "Ready" and condition["status"] == "True":
return True
if condition["type"] == "Failed" and condition["status"] == "True":
return False
return True
def _is_dynamo_graph_deployment_running(self, job: KubernetesJob) -> bool:
if self._genai_perf_completed:
return False
if self.are_vllm_pods_ready(job):
self._run_genai_perf(job)
self._genai_perf_completed = True
for pod_role in {"decode", "prefill", "frontend"}:
try:
pod_name = self._get_dynamo_pod_by_role(pod_role)
logging.debug(f"Fetching logs for {pod_role=} {pod_name=}")
logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=self.default_namespace)
with (job.test_run.output_path / f"{pod_role}_pod.log").open("w") as f:
f.write(logs)
except Exception as e:
logging.debug(f"Error fetching logs for role '{pod_role}': {e}")
return False
deployment = cast(
dict,
self.custom_objects_api.get_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.default_namespace,
plural="dynamographdeployments",
name=job.name,
),
)
status: dict = cast(dict, deployment.get("status", {}))
return self._check_deployment_conditions(status.get("conditions", []))
def kill(self, job: BaseJob) -> None:
"""
Terminate a Kubernetes job.
Args:
job (BaseJob): The job to be terminated.
"""
k_job: KubernetesJob = cast(KubernetesJob, job)
self.store_logs_for_job(k_job.name, k_job.test_run.output_path)
self.delete_job(k_job.name, k_job.kind)
def delete_job(self, job_name: str, job_kind: str) -> None:
if "mpijob" in job_kind.lower():
self._delete_mpi_job(job_name)
elif "job" in job_kind.lower():
self._delete_batch_job(job_name)
elif "dynamographdeployment" in job_kind.lower():
self._delete_dynamo_graph_deployment(job_name)
else:
error_message = f"Unsupported job kind: '{job_kind}'."
logging.error(error_message)
raise ValueError(error_message)
def _delete_mpi_job(self, job_name: str) -> None:
logging.debug(f"Deleting MPIJob '{job_name}'")
try:
self.custom_objects_api.delete_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace=self.default_namespace,
plural="mpijobs",
name=job_name,
body=lazy.k8s.client.V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=5),
)
logging.debug(f"MPIJob '{job_name}' deleted successfully")
except lazy.k8s.client.ApiException as e:
if e.status == 404:
logging.debug(f"MPIJob '{job_name}' not found. It may have already been deleted.")
else:
logging.error(
f"An error occurred while attempting to delete MPIJob '{job_name}'. "
f"Error code: {e.status}. Message: {e.reason}. "
"Please verify the job name and Kubernetes API server."
)
raise
def _delete_batch_job(self, job_name: str) -> None:
logging.debug(f"Deleting batch job '{job_name}'")
try:
api_response = self.batch_v1.delete_namespaced_job(
name=job_name,
namespace=self.default_namespace,
body=lazy.k8s.client.V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=5),
)
except lazy.k8s.client.ApiException as e:
if e.status == 404:
logging.debug(f"Batch job '{job_name}' not found. It may have already been deleted.")
return
logging.error(
f"An error occurred while attempting to delete batch job '{job_name}'. "
f"Error code: {e.status}. Message: {e.reason}. "
"Please verify the job name and Kubernetes API server."
)
raise
api_response = cast("k8s.client.V1Status", api_response)
logging.debug(f"Batch job '{job_name}' deleted with status: {api_response.status}")
def _delete_dynamo_graph_deployment(self, job_name: str) -> None:
logging.debug(f"Deleting DynamoGraphDeployment '{job_name}'")
cmd = f"kubectl delete dgd {job_name} -n {self.default_namespace}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
logging.debug(f"Failed to delete DynamoGraphDeployment: {result.stderr}")
self._genai_perf_completed = False
def create_job(self, job_spec: Dict[Any, Any], timeout: int = 60, interval: int = 1) -> str:
"""
Create a job in the Kubernetes system in a blocking manner.
Args:
job_spec (Dict[Any, Any]): The job specification.
timeout (int): The maximum time to wait for the job to be created and observable.
interval (int): The time to wait between checks, in seconds.
Returns:
str: The job name.
Raises:
ValueError: If the job specification does not contain a valid 'kind' field.
TimeoutError: If the job is not observable within the timeout period.
"""
logging.debug(f"Creating job with spec: {job_spec}")
job_name = self._create_job(job_spec)
# Wait for the job to be observable by Kubernetes
start_time = time.time()
while time.time() - start_time < timeout:
if self._is_job_observable(job_name, job_spec.get("kind", "")):
logging.debug(f"Job '{job_name}' is now observable.")
return job_name
logging.debug(f"Waiting for job '{job_name}' to become observable...")
time.sleep(interval)
raise TimeoutError(f"Job '{job_name}' was not observable within {timeout} seconds.")
def _create_job(self, job_spec: Dict[Any, Any]) -> str:
api_version = job_spec.get("apiVersion", "")
kind = job_spec.get("kind", "").lower()
if "mpijob" in kind:
return self._create_mpi_job(job_spec)
elif ("batch" in api_version) and ("job" in kind):
return self._create_batch_job(job_spec)
elif "dynamographdeployment" in kind:
return self._create_dynamo_graph_deployment(job_spec)
else:
error_message = (
f"Unsupported job kind: '{job_spec.get('kind')}'.\n"
"Please review the job specification generation logic to ensure that the 'kind' field is set "
"correctly.\n"
)
logging.error(error_message)
raise ValueError(error_message)
def _create_batch_job(self, job_spec: Dict[Any, Any]) -> str:
api_response = self.batch_v1.create_namespaced_job(body=job_spec, namespace=self.default_namespace)
if not isinstance(api_response, lazy.k8s.client.V1Job) or api_response.metadata is None:
raise ValueError("Job creation failed or returned an unexpected type")
job_name: str = api_response.metadata.name
logging.debug(f"Job '{job_name}' created with status: {api_response.status}")
return job_name
def _create_mpi_job(self, job_spec: Dict[Any, Any]) -> str:
api_response = self.custom_objects_api.create_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace=self.default_namespace,
plural="mpijobs",
body=job_spec,
)
job_name: str = api_response["metadata"]["name"]
logging.debug(f"MPIJob '{job_name}' created with status: {api_response.get('status')}")
return job_name
def _create_dynamo_graph_deployment(self, job_spec: Dict[Any, Any]) -> str:
logging.debug(f"Attempting to delete existing job='{job_spec['metadata']['name']}' before creation.")
self._delete_dynamo_graph_deployment(job_spec["metadata"]["name"])
logging.debug("Creating DynamoGraphDeployment with spec")
try:
api_response = self.custom_objects_api.create_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.default_namespace,
plural="dynamographdeployments",
body=job_spec,
)
except lazy.k8s.client.ApiException as e:
logging.error(f"An error occurred while creating DynamoGraphDeployment: {e.reason}")
self._delete_dynamo_graph_deployment(job_spec["metadata"]["name"])
raise
job_name = str(api_response["metadata"]["name"])
logging.debug(f"DynamoGraphDeployment '{job_name}' created with status: {api_response.get('status')}")
return job_name
def _is_job_observable(self, job_name: str, job_kind: str) -> bool:
logging.debug(f"Checking if job '{job_name}' of kind '{job_kind}' is observable.")
if "mpijob" in job_kind.lower():
return self._is_mpijob_observable(job_name)
elif "job" in job_kind.lower():
return self._is_batch_job_observable(job_name)
elif "dynamographdeployment" in job_kind.lower():
return self._is_dynamo_graph_deployment_observable(job_name)
else:
logging.error(f"Unsupported job kind: '{job_kind}'")
return False
def _is_mpijob_observable(self, job_name: str) -> bool:
logging.debug(f"Attempting to observe MPIJob '{job_name}'.")
try:
api_instance = self.custom_objects_api
mpijob = api_instance.get_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace=self.default_namespace,
plural="mpijobs",
name=job_name,
)
if mpijob:
logging.debug(f"MPIJob '{job_name}' found with details: {mpijob}.")
return True
else:
logging.debug(f"MPIJob '{job_name}' is not yet observable.")
return False
except lazy.k8s.client.ApiException as e:
if e.status == 404:
logging.debug(f"MPIJob '{job_name}' not found.")
return False
else:
logging.error(
f"An error occurred while checking if MPIJob '{job_name}' is observable: {e.reason}. "
f"Please check the job name, namespace, and Kubernetes API server."
)
raise
def _is_batch_job_observable(self, job_name: str) -> bool:
logging.debug(f"Attempting to observe batch job '{job_name}'.")
try:
return self.batch_v1.read_namespaced_job_status(name=job_name, namespace=self.default_namespace) is not None
except lazy.k8s.client.ApiException as e:
if e.status == 404:
logging.debug(f"Batch job '{job_name}' not found.")
return False
else:
logging.error(
f"An error occurred while checking if batch job '{job_name}' is observable: {e.reason}. "
f"Please check the job name, namespace, and Kubernetes API server."
)
raise
def _is_dynamo_graph_deployment_observable(self, job_name: str) -> bool:
logging.debug(f"Attempting to observe DynamoGraphDeployment '{job_name}'.")
try:
api_instance = self.custom_objects_api
deployment = api_instance.get_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.default_namespace,
plural="dynamographdeployments",
name=job_name,
)
if deployment:
logging.debug(f"DynamoGraphDeployment '{job_name}' found with details: {deployment}.")
return True
else:
logging.debug(f"DynamoGraphDeployment '{job_name}' is not yet observable.")
return False
except lazy.k8s.client.ApiException as e:
if e.status == 404:
logging.debug(f"DynamoGraphDeployment '{job_name}' not found.")
return False
else:
logging.error(
f"An error occurred while checking if DynamoGraphDeployment '{job_name}' "
f"is observable: {e.reason}. Please check the job name, namespace, and "
"Kubernetes API server."
)
raise
def list_jobs(self) -> List[Any]:
"""
List all jobs in the Kubernetes system's default namespace.
Returns
List[Any]: A list of jobs in the namespace.
"""
logging.debug(f"Listing jobs in namespace '{self.default_namespace}'")
return self.batch_v1.list_namespaced_job(namespace=self.default_namespace).items
def create_node_group(self, name: str, node_list: List[str]) -> None:
"""
Create a node group in the Kubernetes system.
Args:
name (str): The name of the node group.
node_list (List[str]): List of node names to be included in the group.
"""
logging.debug(f"Creating node group '{name}' with nodes: {node_list}")
for node in node_list:
body = {"metadata": {"labels": {"cloudai/node-group": name}}}
logging.debug(f"Labeling node '{node}' with group '{name}'")
self.core_v1.patch_node(node, body)
def store_logs_for_job(self, job_name: str, output_dir: Path) -> None:
"""
Retrieve and store logs for all pods associated with a given job.
Args:
job_name (str): The name of the job.
output_dir (Path): The directory where logs will be saved.
"""
pod_names = self.get_pod_names_for_job(job_name)
if not pod_names:
logging.debug(f"No pods found for job '{job_name}'")
return
output_dir.mkdir(parents=True, exist_ok=True)
stdout_file_path = output_dir / "stdout.txt"
with stdout_file_path.open("w") as stdout_file:
for pod_name in pod_names:
try:
logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=self.default_namespace)
log_file_path = output_dir / f"{pod_name}.txt"
with log_file_path.open("w") as log_file:
log_file.write(logs)
logging.debug(f"Logs for pod '{pod_name}' saved to '{log_file_path}'")
stdout_file.write(logs + "\n")
except lazy.k8s.client.ApiException as e:
logging.debug(f"Error retrieving logs for pod '{pod_name}': {e}")
logging.debug(f"All logs concatenated and saved to '{stdout_file_path}'")
def get_pod_names_for_job(self, job_name: str) -> List[str]:
"""
Retrieve pod names associated with a given job.
Args:
job_name (str): The name of the job.
Returns:
List[str]: A list of pod names associated with the job.
"""
pod_names = []
try:
pods = self.core_v1.list_namespaced_pod(namespace=self.default_namespace)
for pod in pods.items:
if pod.metadata.labels and pod.metadata.labels.get("training.kubeflow.org/job-name") == job_name:
pod_names.append(pod.metadata.name)
except lazy.k8s.client.ApiException as e:
logging.error(f"Error retrieving pods for job '{job_name}': {e}")
return pod_names