# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from pydantic import BaseModel, ConfigDict, Field, field_serializer
from cloudai.core import BaseJob, System
from cloudai.util import CommandShell
class LSFNodeObj(BaseModel):
"""Represents a node in the LSF system."""
name: str
state: str
user: Optional[str] = None
[docs]
class LSFGroup(BaseModel):
"""Represents a group of nodes within a queue."""
model_config = ConfigDict(extra="forbid")
name: str
nodes: List[str]
[docs]
class LSFQueue(BaseModel):
"""Represents a queue within the LSF system."""
model_config = ConfigDict(extra="forbid")
name: str
groups: List[LSFGroup] = []
lsf_nodes: List[LSFNodeObj] = Field(default_factory=list, exclude=True)
[docs]
class LSFSystem(System):
"""Represents an LSF system."""
queues: List[LSFQueue] = Field(
default_factory=list, description="A list of queues in the LSF system, filled in automatically"
)
account: Optional[str] = None
scheduler: str = "lsf"
project_name: Optional[str] = None
default_queue: Optional[str] = None
monitor_interval: int = 60
app: Optional[str] = None
os_version: Optional[str] = None
cmd_shell: CommandShell = Field(default=CommandShell(), exclude=True)
@field_serializer("install_path", "output_path")
def _path_serializer(self, v: Path) -> str:
return str(v)
def update(self) -> None:
"""
Update the system object for an LSF system.
This method queries the current state of each node using the 'bhosts' and 'bjobs' commands.
"""
bhosts_output, _ = self.fetch_command_output("bhosts")
bjobs_output, _ = self.fetch_command_output("bjobs -u all")
node_user_map = self.parse_bjobs_output(bjobs_output)
self.parse_bhosts_output(bhosts_output, node_user_map)
def is_job_running(self, job: BaseJob) -> bool:
"""
Check if a specified LSF job is currently running.
Args:
job (BaseJob): The job to check.
Returns:
bool: True if the job is running, False otherwise.
"""
command = f"bjobs -noheader -o stat {job.id}"
stdout, _ = self.cmd_shell.execute(command).communicate()
return stdout.strip() == "RUN"
def is_job_completed(self, job: BaseJob) -> bool:
"""
Check if a specified LSF job is completed.
Args:
job (BaseJob): The job to check.
Returns:
bool: True if the job is completed, False otherwise.
"""
command = f"bjobs -noheader -o stat {job.id}"
stdout, _ = self.cmd_shell.execute(command).communicate()
return stdout.strip() in ["DONE", "EXIT"]
def kill(self, job: BaseJob) -> None:
"""
Terminate an LSF job.
Args:
job (BaseJob): The job to terminate.
"""
self.cmd_shell.execute(f"bkill {job.id}")
def fetch_command_output(self, command: str) -> Tuple[str, str]:
"""
Execute a system command and return its output.
Args:
command (str): The command to execute.
Returns:
Tuple[str, str]: The stdout and stderr from the command execution.
"""
logging.debug(f"Executing command: {command}")
stdout, stderr = self.cmd_shell.execute(command).communicate()
if stderr:
logging.error(f"Error executing command '{command}': {stderr}")
return stdout, stderr
def parse_bjobs_output(self, bjobs_output: str) -> Dict[str, str]:
"""
Parse the output of the `bjobs` command to map nodes to users.
Args:
bjobs_output (str): The output of the `bjobs -u all` command.
Returns:
Dict[str, str]: A dictionary mapping node names to user names.
"""
node_user_map = {}
for line in bjobs_output.splitlines():
parts = line.split()
if len(parts) < 6:
continue
_, user, _, _, _, exec_host = parts[:6]
if exec_host not in node_user_map:
node_user_map[exec_host] = user
return node_user_map
def parse_bhosts_output(self, bhosts_output: str, node_user_map: Dict[str, str]) -> None:
"""
Parse the output of the `bhosts` command and update the system's node information.
Args:
bhosts_output (str): The output of the `bhosts` command.
node_user_map (Dict[str, str]): A dictionary mapping node names to user names.
"""
self.queues = []
queue_map = {}
for line in bhosts_output.splitlines():
parts = line.split()
if len(parts) < 6:
continue
node_name, status, _, _, _, queue_name = parts[:6]
if queue_name not in queue_map:
queue_map[queue_name] = LSFQueue(name=queue_name)
queue = queue_map[queue_name]
user = node_user_map.get(node_name)
node = LSFNodeObj(name=node_name, state=status, user=user)
queue.lsf_nodes.append(node)
self.queues = list(queue_map.values())