Scheduling Workload Integration
5.2.1. Scheduling Workload Integration Use Case
Overview
This use case demonstrates workload scheduling integration with DPS. Integrators can test their scheduling solutions using the simulator environment to verify operation, observe power utilization compliance, and identify job scheduling issues.
Goal
Test workload scheduling capabilities with DPS using a simulator environment for validation independent of specific hardware configurations.
Persona: Workload Scheduling Integrator
Role: Integration engineer developing solutions to integrate workload schedulers with DPS power management system.
Objective: Integrate scheduling solution into DPS for power-aware workload management using DPS simulator.
Use Case Workflow
Scenario: Workload Scheduling Integrator plans to integrate their scheduling solution into DPS
Workflow Steps:
- Create Integration Components - Create SLURM prolog/epilog scripts to utilize DPS interfaces
- Transform Simulated Node List - Transform the simulated node list in the DPS simulator into scheduler resource pool
- Enable Integration - Enable integration and verify operation
- Observe Job Scheduling Failures - Observe DPS simulator dashboards for indications of job scheduling failures
- Observe Power Utilization Compliance - Observe DPS simulator dashboards for power utilization and compliance status
Theory of Operation
Scheduler Integration Architecture
graph TB
A[Job Scheduler] --> B[Prolog Script]
B --> C[DPS Resource Group Creation]
C --> D[Power Policy Application]
D --> E[Job Execution]
E --> F[Power Monitoring]
F --> G[DPS Dashboards]
E --> H[Job Completion]
H --> I[Epilog Script]
I --> J[Resource Group Cleanup]
B --> K[Node List Transform]
K --> L[Policy Selection]
L --> M[Resource Allocation]Resource Group Lifecycle
Scheduler integration implements a resource group lifecycle:
- Job Submission - Scheduler receives job and allocates nodes
- Prolog Execution - Create DPS resource group with allocated nodes and appropriate power policy
- Job Activation - Activate resource group to apply power policies to hardware
- Job Monitoring - Monitor power consumption and compliance during job execution
- Job Completion - Execute epilog to clean up resource group and restore default policies
Simulator Configuration
Configure your DPS environment with:
- Topology file defining your datacenter nodes and power distribution
- BMC simulator configuration for your hardware
- Power policies appropriate for your workload types
Step-by-Step Integration Guide
Step 1: Create Integration Components
Create SLURM prolog and epilog scripts that interface with DPS APIs.
Prolog Script Implementation
#!/bin/bash
# slurm_dps_prolog.sh
# SLURM Prolog script for DPS integration
set -euo pipefail
# Configuration
DPS_API_ENDPOINT="${DPS_API_ENDPOINT:-api.dps:443}"
DPS_USERNAME="${DPS_USERNAME:-slurm-integrator}"
DPS_PASSWORD="${DPS_PASSWORD:-slurm-password}"
DPSCTL_PATH="${DPSCTL_PATH:-dpsctl}"
# SLURM job environment variables (automatically set by SLURM)
JOB_ID="${SLURM_JOB_ID}"
JOB_NAME="${SLURM_JOB_NAME:-job-$JOB_ID}"
NODE_LIST="${SLURM_JOB_NODELIST}"
JOB_PARTITION="${SLURM_JOB_PARTITION}"
JOB_QOS="${SLURM_JOB_QOS:-normal}"
# Log file for debugging
LOG_FILE="/var/log/slurm/dps_prolog.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - PROLOG - JOB $JOB_ID - $1" >> "$LOG_FILE"
}
log "Starting DPS integration for job $JOB_ID"
log "Job details: name=$JOB_NAME, partition=$JOB_PARTITION, qos=$JOB_QOS"
log "Allocated nodes: $NODE_LIST"
# Function to determine power policy based on job characteristics
determine_power_policy() {
local job_name="$1"
local partition="$2"
local qos="$3"
# Policy selection logic
case "$partition" in
"gpu-high"|"training")
if [[ "$job_name" =~ ml-training|training|llm ]]; then
echo "ML-Training-High"
else
echo "Node-High"
fi
;;
"gpu-standard"|"inference")
echo "Inference-Medium"
;;
"gpu-dev"|"development"|"debug")
echo "Development-Low"
;;
*)
# Default policy based on QOS
case "$qos" in
"high"|"urgent")
echo "Node-High"
;;
"low"|"preempt")
echo "Development-Low"
;;
*)
echo "Inference-Medium"
;;
esac
;;
esac
}
# Function to expand SLURM node list format
expand_node_list() {
local node_list="$1"
# Use SLURM's scontrol to expand compressed node list
if command -v scontrol >/dev/null 2>&1; then
scontrol show hostnames "$node_list" | tr '\n' ',' | sed 's/,$//'
else
# Fallback: assume simple comma-separated list
echo "$node_list" | tr ' ' ','
fi
}
# Function to create DPS resource group
create_dps_resource_group() {
local job_id="$1"
local policy="$2"
local nodes="$3"
local group_name="slurm-job-$job_id"
log "Creating DPS resource group: $group_name"
log "Policy: $policy, Nodes: $nodes"
# Build dpsctl command
local cmd=(
"$DPSCTL_PATH" "resource-group" "create"
"--group-name=$group_name"
"--external-id=$job_id"
"--policy=$policy"
)
# Add nodes to command
IFS=',' read -ra NODE_ARRAY <<< "$nodes"
for node in "${NODE_ARRAY[@]}"; do
cmd+=("--nodes=$node")
done
# Execute resource group creation
if "${cmd[@]}" 2>&1 | tee -a "$LOG_FILE"; then
log "Resource group $group_name created successfully"
# Activate the resource group
if $DPSCTL_PATH resource-group activate "$group_name" 2>&1 | tee -a "$LOG_FILE"; then
log "Resource group $group_name activated successfully"
return 0
else
log "ERROR: Failed to activate resource group $group_name"
return 1
fi
else
log "ERROR: Failed to create resource group $group_name"
return 1
fi
}
# Function to verify resource group activation
verify_resource_group() {
local job_id="$1"
local group_name="slurm-job-$job_id"
log "Verifying resource group activation: $group_name"
if $DPSCTL_PATH resource-group status "$group_name" 2>&1 | tee -a "$LOG_FILE" | grep -q "active.*true"; then
log "Resource group $group_name verification successful"
return 0
else
log "WARNING: Resource group $group_name verification failed"
return 1
fi
}
# Main execution
main() {
# Validate required environment
if [[ -z "$JOB_ID" || -z "$NODE_LIST" ]]; then
log "ERROR: Missing required SLURM environment variables"
exit 1
fi
# Check dpsctl availability
if ! command -v "$DPSCTL_PATH" >/dev/null 2>&1; then
log "ERROR: dpsctl not found at $DPSCTL_PATH"
exit 1
fi
# Determine appropriate power policy
POWER_POLICY=$(determine_power_policy "$JOB_NAME" "$JOB_PARTITION" "$JOB_QOS")
log "Selected power policy: $POWER_POLICY"
# Expand node list
EXPANDED_NODES=$(expand_node_list "$NODE_LIST")
log "Expanded node list: $EXPANDED_NODES"
# Create and activate DPS resource group
if create_dps_resource_group "$JOB_ID" "$POWER_POLICY" "$EXPANDED_NODES"; then
# Verify activation (optional, for debugging)
verify_resource_group "$JOB_ID"
log "DPS integration completed successfully for job $JOB_ID"
exit 0
else
log "ERROR: DPS integration failed for job $JOB_ID"
exit 1
fi
}
# Execute main function
main "$@"Epilog Script Implementation
#!/bin/bash
# slurm_dps_epilog.sh
# SLURM Epilog script for DPS cleanup
set -euo pipefail
# Configuration
DPSCTL_PATH="${DPSCTL_PATH:-dpsctl}"
# SLURM job environment variables
JOB_ID="${SLURM_JOB_ID}"
JOB_NAME="${SLURM_JOB_NAME:-job-$JOB_ID}"
# Log file for debugging
LOG_FILE="/var/log/slurm/dps_epilog.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - EPILOG - JOB $JOB_ID - $1" >> "$LOG_FILE"
}
log "Starting DPS cleanup for job $JOB_ID"
# Function to delete DPS resource group
cleanup_dps_resource_group() {
local job_id="$1"
local group_name="slurm-job-$job_id"
log "Cleaning up DPS resource group: $group_name"
# Check if resource group exists
if $DPSCTL_PATH resource-group list 2>/dev/null | grep -q "$group_name"; then
# Delete the resource group
if $DPSCTL_PATH resource-group delete "$group_name" 2>&1 | tee -a "$LOG_FILE"; then
log "Resource group $group_name deleted successfully"
return 0
else
log "WARNING: Failed to delete resource group $group_name"
return 1
fi
else
log "Resource group $group_name not found (may have already been cleaned up)"
return 0
fi
}
# Main execution
main() {
# Validate required environment
if [[ -z "$JOB_ID" ]]; then
log "ERROR: Missing required SLURM_JOB_ID environment variable"
exit 1
fi
# Check dpsctl availability
if ! command -v "$DPSCTL_PATH" >/dev/null 2>&1; then
log "ERROR: dpsctl not found at $DPSCTL_PATH"
exit 1
fi
# Cleanup DPS resource group
if cleanup_dps_resource_group "$JOB_ID"; then
log "DPS cleanup completed successfully for job $JOB_ID"
exit 0
else
log "WARNING: DPS cleanup had issues for job $JOB_ID (continuing anyway)"
exit 0 # Do not fail the job due to cleanup issues
fi
}
# Execute main function
main "$@"Integration Helper (Python)
#!/usr/bin/env python3
"""
SLURM-DPS Integration Helper
Integration functions for SLURM scheduler with DPS
"""
import os
import json
import logging
import subprocess
import time
from typing import Dict, List, Optional
from dpsapi.api import DpsApi
class SLURMDPSIntegrator:
def __init__(self,
dps_endpoint: str = "api.dps:443",
dpsctl_path: str = "dpsctl"):
self.logger = logging.getLogger(__name__)
self.dpsctl_path = dpsctl_path
# Initialize DPS API connection
endpoint_parts = dps_endpoint.split(':')
self.dps_api = (DpsApi(endpoint_parts[0], int(endpoint_parts[1]))
.with_username("slurm-integrator")
.with_password("slurm-password")
.with_insecure_tls_skip_verify(True))
# Job type to policy mapping
self.policy_mapping = {
'ml-training': 'GB200-High',
'inference': 'GB200-Med',
'development': 'GB200-Low',
'simulation': 'GB200-High',
'data-processing': 'GB200-Med'
}
def analyze_job_requirements(self, job_info: Dict) -> Dict:
"""Analyze job requirements to determine power policy"""
job_name = job_info.get('name', '').lower()
partition = job_info.get('partition', '').lower()
qos = job_info.get('qos', 'normal').lower()
time_limit = job_info.get('time_limit', '01:00:00')
# Analyze job characteristics
analysis = {
'job_id': job_info.get('job_id'),
'job_name': job_name,
'partition': partition,
'qos': qos,
'time_limit': time_limit,
'nodes': job_info.get('nodes', []),
'estimated_duration_hours': self._parse_time_limit(time_limit)
}
# Determine job type from name and partition
job_type = 'development' # default
if any(keyword in job_name for keyword in ['training', 'train', 'llm', 'gpt']):
job_type = 'ml-training'
elif any(keyword in job_name for keyword in ['inference', 'infer', 'serving']):
job_type = 'inference'
elif any(keyword in job_name for keyword in ['simulation', 'sim', 'hpc']):
job_type = 'simulation'
elif any(keyword in job_name for keyword in ['data', 'process', 'etl']):
job_type = 'data-processing'
elif partition in ['gpu-high', 'training'] or qos in ['high', 'urgent']:
job_type = 'ml-training'
analysis['job_type'] = job_type
analysis['recommended_policy'] = self.policy_mapping.get(job_type, 'GB200-Med')
# Adjust policy based on duration for long-running jobs
if analysis['estimated_duration_hours'] > 12:
if job_type in ['ml-training', 'simulation']:
analysis['recommended_policy'] = 'GB200-High'
else:
analysis['recommended_policy'] = 'GB200-Med'
self.logger.info(f"Job analysis complete: {analysis['job_type']} -> {analysis['recommended_policy']}")
return analysis
def _parse_time_limit(self, time_limit: str) -> float:
"""Parse SLURM time limit format to hours"""
try:
# Handle formats like "01:00:00", "2-12:00:00", "60"
if '-' in time_limit:
days, time_part = time_limit.split('-')
hours_from_days = int(days) * 24
else:
hours_from_days = 0
time_part = time_limit
if ':' in time_part:
parts = time_part.split(':')
if len(parts) == 3: # HH:MM:SS
hours = int(parts[0]) + int(parts[1])/60 + int(parts[2])/3600
elif len(parts) == 2: # HH:MM
hours = int(parts[0]) + int(parts[1])/60
else:
hours = int(parts[0])
else:
# Assume minutes if no colons
hours = int(time_part) / 60
return hours_from_days + hours
except:
return 1.0 # default to 1 hour if parsing fails
def create_resource_group_for_job(self, job_analysis: Dict) -> bool:
"""Create resource group for job"""
job_id = job_analysis['job_id']
policy = job_analysis['recommended_policy']
nodes = job_analysis['nodes']
group_name = f"slurm-job-{job_id}"
try:
self.logger.info(f"Creating resource group: {group_name}")
# Build dpsctl command
cmd = [
self.dpsctl_path, "resource-group", "create",
f"--group-name={group_name}",
f"--external-id={job_id}",
f"--policy={policy}"
]
# Add nodes
for node in nodes:
cmd.extend([f"--nodes={node}"])
# Execute command
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info(f"Resource group {group_name} created successfully")
# Activate with monitoring
return self._activate_with_monitoring(group_name, job_analysis)
else:
self.logger.error(f"Failed to create resource group: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Error creating resource group: {e}")
return False
def _activate_with_monitoring(self, group_name: str, job_analysis: Dict) -> bool:
"""Activate resource group with power monitoring"""
try:
# Activate resource group
cmd = [self.dpsctl_path, "resource-group", "activate", group_name]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
self.logger.error(f"Failed to activate resource group: {result.stderr}")
return False
self.logger.info(f"Resource group {group_name} activated successfully")
# Monitor initial power consumption
time.sleep(5) # Allow activation to take effect
power_metrics = self._get_initial_power_metrics(job_analysis['nodes'])
if power_metrics:
self.logger.info(f"Initial power consumption: {power_metrics['total_power']:.1f}W "
f"across {len(job_analysis['nodes'])} GB200 nodes")
return True
except Exception as e:
self.logger.error(f"Error during activation monitoring: {e}")
return False
def _get_initial_power_metrics(self, nodes: List[str]) -> Optional[Dict]:
"""Get initial power metrics for monitoring"""
try:
gpu_list = []
for node in nodes:
for gpu_id in range(4): # DGX_GB200 has 4 GPUs
gpu_list.append([node, gpu_id])
metrics_response = self.dps_api.request_metrics(requested_gpus=gpu_list)
if 'metrics' in metrics_response:
total_power = sum(metric['usage'] for metric in metrics_response['metrics'])
return {
'total_power': total_power,
'node_count': len(nodes),
'gpu_count': len(gpu_list),
'metrics': metrics_response['metrics']
}
except Exception as e:
self.logger.warning(f"Could not retrieve initial power metrics: {e}")
return None
def monitor_job_power_usage(self, job_id: str, nodes: List[str],
duration_minutes: int = 60) -> Dict:
"""Monitor power usage during job execution"""
self.logger.info(f"Starting power monitoring for job {job_id} ({duration_minutes} minutes)")
gpu_list = []
for node in nodes:
for gpu_id in range(4): # DGX_GB200 has 4 GPUs
gpu_list.append([node, gpu_id])
monitoring_data = {
'job_id': job_id,
'start_time': time.time(),
'samples': [],
'duration_minutes': duration_minutes
}
end_time = time.time() + (duration_minutes * 60)
sample_interval = 30 # 30 second intervals
while time.time() < end_time:
try:
metrics_response = self.dps_api.request_metrics(requested_gpus=gpu_list)
if 'metrics' in metrics_response:
total_power = sum(metric['usage'] for metric in metrics_response['metrics'])
sample = {
'timestamp': time.time(),
'total_power': total_power,
'node_metrics': {}
}
# Organize by node
for metric in metrics_response['metrics']:
node = metric['node']
if node not in sample['node_metrics']:
sample['node_metrics'][node] = []
sample['node_metrics'][node].append(metric['usage'])
monitoring_data['samples'].append(sample)
self.logger.info(f"Power sample: {total_power:.1f}W total")
time.sleep(sample_interval)
except Exception as e:
self.logger.error(f"Error during power monitoring: {e}")
time.sleep(sample_interval)
# Calculate summary statistics
if monitoring_data['samples']:
power_values = [s['total_power'] for s in monitoring_data['samples']]
monitoring_data['summary'] = {
'average_power': sum(power_values) / len(power_values),
'min_power': min(power_values),
'max_power': max(power_values),
'sample_count': len(power_values)
}
self.logger.info(f"Monitoring complete - Avg: {monitoring_data['summary']['average_power']:.1f}W, "
f"Min: {monitoring_data['summary']['min_power']:.1f}W, "
f"Max: {monitoring_data['summary']['max_power']:.1f}W")
return monitoring_data
def cleanup_with_reporting(self, job_id: str) -> Dict:
"""Cleanup resource group with final power reporting"""
group_name = f"slurm-job-{job_id}"
cleanup_report = {
'job_id': job_id,
'cleanup_time': time.time(),
'success': False,
'final_metrics': None
}
try:
# Get final power metrics before cleanup
resource_groups = self.dps_api.list_resource_groups()
for rg in resource_groups.get('resource_groups', []):
if rg.get('group_name') == group_name:
nodes = rg.get('resource_names', [])
if nodes:
cleanup_report['final_metrics'] = self._get_initial_power_metrics(nodes)
break
# Delete resource group
cmd = [self.dpsctl_path, "resource-group", "delete", group_name]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
cleanup_report['success'] = True
self.logger.info(f"Resource group {group_name} cleaned up successfully")
else:
self.logger.warning(f"Resource group cleanup had issues: {result.stderr}")
cleanup_report['error'] = result.stderr
except Exception as e:
self.logger.error(f"Error during cleanup with reporting: {e}")
cleanup_report['error'] = str(e)
return cleanup_report
### Step 2: Transform Simulated Node List into Scheduler Resource Pool
Transform the simulated node list in the DPS simulator into your scheduler resource pool.
#### Node Transformation (Python)
```python
#!/usr/bin/env python3
"""
Transform DPS Simulator Nodes into Scheduler Resource Pool
"""
import subprocess
import json
from typing import List, Dict
class DPSSimulatorNodeTransformer:
def __init__(self):
# DPS simulator GB200 nodes
self.simulator_nodes = []
for rack in range(1, 9): # Racks 01-08
for node in range(1, 13): # Nodes 0001-0012
self.simulator_nodes.append(f"gb200-r{rack:02d}-{node:04d}")
# Node characteristics
self.node_specs = {
"DGX_GB200": {
"gpus": 4, # GB200 has 4 GPUs per node
"cpus": 2, # Grace CPUs
"min_power": 900, # Watts (from pkg/device/specs/devices.yaml)
"max_power": 5640, # Watts
"gpu_model": "GB200",
"cpu_model": "Grace"
}
}
def get_dps_simulator_topology(self) -> Dict:
"""Retrieve current topology from DPS simulator"""
try:
result = subprocess.run(
["dpsctl", "topology", "list-entities", "dps-simulator"],
capture_output=True, text=True
)
if result.returncode == 0:
# Parse dpsctl output to get node information
lines = result.stdout.strip().split('\n')
nodes = []
for line in lines:
if 'ComputerSystem' in line and any(node in line for node in self.simulator_nodes):
# Extract node name from dpsctl output
for node in self.simulator_nodes:
if node in line:
nodes.append({
"name": node,
"type": "ComputerSystem",
"model": "DGX_GB200",
"status": "available",
"power_policy": self._extract_policy_from_line(line)
})
break
return {"nodes": nodes, "total_count": len(nodes)}
else:
print(f"Failed to get topology: {result.stderr}")
return {"nodes": [], "total_count": 0}
except Exception as e:
print(f"Error retrieving DPS topology: {e}")
return {"nodes": [], "total_count": 0}
def _extract_policy_from_line(self, line: str) -> str:
"""Extract policy name from dpsctl output line"""
# Extract policy names
if "GB200-High" in line:
return "GB200-High"
elif "GB200-Med" in line:
return "GB200-Med"
elif "GB200-Low" in line:
return "GB200-Low"
else:
return "Default"
def transform_to_slurm_nodes_conf(self) -> str:
"""Transform DPS simulator nodes into SLURM nodes.conf format"""
topology = self.get_dps_simulator_topology()
slurm_config = []
for node in topology["nodes"]:
node_name = node["name"]
model = node.get("model", "DGX_GB200")
spec = self.node_specs.get(model, self.node_specs["DGX_GB200"])
# Generate SLURM node configuration for GB200
slurm_node_config = (
f"NodeName={node_name} "
f"CPUs=144 " # Grace CPU configuration for GB200
f"Gres=gpu:gb200:{spec['gpus']} " # 4 GB200 GPUs per node
f"State=IDLE "
f"Features=dps_managed,power_aware,gb200 "
f"# DPS Power: {spec['min_power']}-{spec['max_power']}W"
)
slurm_config.append(slurm_node_config)
# Add partition configuration
node_list = ",".join([node["name"] for node in topology["nodes"]])
partition_config = f"PartitionName=dps_simulator Nodes={node_list} Default=YES MaxTime=INFINITE State=UP"
full_config = "\n".join(slurm_config + ["", partition_config])
print("Generated SLURM nodes.conf configuration:")
print(full_config)
return full_config
def create_scheduler_resource_pool(self) -> Dict:
"""Create a generic scheduler resource pool from DPS simulator nodes"""
topology = self.get_dps_simulator_topology()
resource_pool = {
"pool_name": "dps_simulator_pool",
"total_nodes": topology["total_count"],
"available_nodes": topology["total_count"],
"resources": []
}
for node in topology["nodes"]:
spec = self.node_specs.get(node.get("model", "DGX_GB200"), self.node_specs["DGX_GB200"])
resource = {
"node_id": node["name"],
"node_type": "gpu_compute",
"capabilities": {
"gpu_count": spec["gpus"], # 4 GB200 GPUs
"cpu_count": spec["cpus"], # 2 Grace CPUs
"gpu_model": spec["gpu_model"], # GB200
"cpu_model": spec["cpu_model"], # Grace
"power_management": True,
"power_range": f"{spec['min_power']}-{spec['max_power']}W",
"dps_managed": True,
"architecture": "grace_blackwell"
},
"current_policy": node.get("power_policy", "Default"),
"status": "available",
"scheduler_metadata": {
"partition": "dps_simulator",
"priority": "normal",
"exclusive": False,
"rack_id": node["name"][:10] # Extract rack ID (e.g., "gb200-r01")
}
}
resource_pool["resources"].append(resource)
print(f"Created scheduler resource pool with {len(resource_pool['resources'])} nodes")
return resource_pool
# Usage example
def transform_simulator_nodes_example():
"""Example of transforming DPS simulator nodes"""
transformer = DPSSimulatorNodeTransformer()
print("=== Step 2: Transform Simulated Node List ===")
# Get current DPS simulator topology
print("1. Retrieving DPS simulator topology...")
topology = transformer.get_dps_simulator_topology()
print(f" Found {topology['total_count']} nodes in simulator")
# Transform to SLURM format
print("\n2. Transforming to SLURM configuration...")
slurm_config = transformer.transform_to_slurm_nodes_conf()
# Create generic resource pool
print("\n3. Creating scheduler resource pool...")
resource_pool = transformer.create_scheduler_resource_pool()
return resource_pool
if __name__ == "__main__":
transform_simulator_nodes_example()Step 3: Enable Integration and Verify Operation
Deploy and test the integration components.
Integration Deployment and Testing
#!/bin/bash
# enable_integration.sh
# Enable scheduler integration with DPS and test functionality
set -euo pipefail
echo "=== Step 3: Enable Integration and Observe Functionality ==="
# 1. Deploy integration scripts
echo "1. Deploying integration scripts..."
sudo cp slurm_dps_prolog.sh /usr/local/bin/
sudo cp slurm_dps_epilog.sh /usr/local/bin/
sudo chmod +x /usr/local/bin/slurm_dps_*.sh
# 2. Update SLURM configuration
echo "2. Updating SLURM configuration..."
sudo tee -a /etc/slurm/slurm.conf << 'EOF'
# DPS Integration Configuration
Prolog=/usr/local/bin/slurm_dps_prolog.sh
Epilog=/usr/local/bin/slurm_dps_epilog.sh
PrologFlags=Alloc
# DPS Simulator Partition
PartitionName=dps_simulator Nodes=gb200-r01-[0001-0012],gb200-r02-[0001-0012] Default=YES MaxTime=INFINITE State=UP
EOF
# 3. Restart SLURM services
echo "3. Restarting SLURM services..."
sudo systemctl restart slurmctld
sudo systemctl restart slurmd
# 4. Verify integration functionality
echo "4. Testing integration functionality..."
# Test job submission
echo " Submitting test job..."
TEST_JOB_ID=$(sbatch --parsable --job-name=dps-integration-test --partition=dps_simulator --nodes=2 --time=5:00 << 'EOF'
#!/bin/bash
echo "DPS Integration Test Job"
echo "Job ID: $SLURM_JOB_ID"
echo "Nodes: $SLURM_JOB_NODELIST"
sleep 60
echo "Job completed"
EOF
)
echo " Test job submitted with ID: $TEST_JOB_ID"
# Monitor job and DPS integration
echo " Monitoring job execution..."
sleep 10
# Check if resource group was created
echo " Checking DPS resource group creation..."
if dpsctl resource-group list | grep -q "slurm-job-$TEST_JOB_ID"; then
echo " [SUCCESS] Resource group created successfully"
else
echo " [FAILED] Resource group creation failed"
fi
# Wait for job completion
echo " Waiting for job completion..."
while squeue -j $TEST_JOB_ID | grep -q $TEST_JOB_ID; do
sleep 5
done
# Check if resource group was cleaned up
echo " Checking DPS resource group cleanup..."
sleep 5
if dpsctl resource-group list | grep -q "slurm-job-$TEST_JOB_ID"; then
echo " [FAILED] Resource group cleanup failed"
else
echo " [SUCCESS] Resource group cleaned up successfully"
fi
echo "=== Integration functionality test complete ==="Step 4: Observe Job Scheduling Failures
Observe DPS simulator dashboards for indications of job scheduling failures.
Dashboard Monitoring (Python)
#!/usr/bin/env python3
"""
Monitor DPS Simulator Dashboards for Job Scheduling Failures
"""
import time
import json
import subprocess
from typing import Dict, List
from dpsapi.api import DpsApi
class JobSchedulingFailureMonitor:
def __init__(self):
self.dps_api = (DpsApi("api.dps", 443)
.with_username("monitoring")
.with_password("monitor-password")
.with_insecure_tls_skip_verify(True))
self.failure_indicators = []
def monitor_scheduling_failures(self, duration_minutes: int = 30):
"""Monitor DPS dashboards for job scheduling failure indicators"""
print(f"=== Step 4: Monitor Job Scheduling Failures ({duration_minutes} minutes) ===")
end_time = time.time() + (duration_minutes * 60)
failure_count = 0
while time.time() < end_time:
try:
# Check for common scheduling failure indicators
failures = self._check_failure_indicators()
if failures:
failure_count += len(failures)
print(f"[WARNING] Detected {len(failures)} scheduling issues:")
for failure in failures:
print(f" - {failure['type']}: {failure['message']}")
print(f" Time: {time.strftime('%H:%M:%S', time.localtime(failure['timestamp']))}")
print(f" Affected: {failure.get('affected_resources', 'Unknown')}")
time.sleep(30) # Check every 30 seconds
except Exception as e:
print(f"Error monitoring failures: {e}")
time.sleep(30)
print(f"\n=== Monitoring Summary ===")
print(f"Total scheduling issues detected: {failure_count}")
print(f"Monitoring duration: {duration_minutes} minutes")
if failure_count == 0:
print("[SUCCESS] No scheduling failures detected - Integration working correctly")
else:
print("[WARNING] Scheduling issues detected - Review failure details above")
return self.failure_indicators
def _check_failure_indicators(self) -> List[Dict]:
"""Check various indicators of job scheduling failures"""
current_failures = []
timestamp = time.time()
# 1. Check for resource groups in error state
try:
resource_groups = self.dps_api.list_resource_groups()
for rg in resource_groups.get('resource_groups', []):
if not rg.get('is_active', False) and rg.get('group_name', '').startswith('slurm-job-'):
current_failures.append({
'type': 'resource_group_inactive',
'message': f"Resource group {rg['group_name']} is not active",
'timestamp': timestamp,
'affected_resources': rg.get('resource_names', [])
})
except Exception as e:
current_failures.append({
'type': 'api_error',
'message': f"Failed to query resource groups: {e}",
'timestamp': timestamp,
'affected_resources': 'API'
})
# 2. Check for nodes with failed policy application
try:
nodes = ["gb200-r01-0001", "gb200-r01-0002", "gb200-r01-0003", "gb200-r01-0004"] # Sample nodes
gpu_list = [[node, gpu_id] for node in nodes for gpu_id in range(4)] # DGX_GB200 has 4 GPUs
metrics_response = self.dps_api.request_metrics(requested_gpus=gpu_list)
if 'metrics' not in metrics_response:
current_failures.append({
'type': 'metrics_unavailable',
'message': "GPU metrics not available from DPS",
'timestamp': timestamp,
'affected_resources': nodes
})
else:
# Check for nodes with zero power (indicating potential failure)
node_powers = {}
for metric in metrics_response['metrics']:
node = metric['node']
if node not in node_powers:
node_powers[node] = 0
node_powers[node] += metric['usage']
for node, power in node_powers.items():
if power < 100: # Low power might indicate issues
current_failures.append({
'type': 'low_power_node',
'message': f"Node {node} showing unexpectedly low power: {power:.1f}W",
'timestamp': timestamp,
'affected_resources': [node]
})
except Exception as e:
current_failures.append({
'type': 'power_monitoring_error',
'message': f"Failed to monitor node power: {e}",
'timestamp': timestamp,
'affected_resources': 'Power monitoring'
})
# 3. Check SLURM job queue for stuck jobs
try:
result = subprocess.run(
["squeue", "--format=%i,%j,%t,%r", "--noheader"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if line:
job_id, job_name, state, reason = line.split(',')
# Look for problematic job states
if state in ['PD'] and 'Power' in reason:
current_failures.append({
'type': 'job_power_pending',
'message': f"Job {job_id} pending due to power constraint: {reason}",
'timestamp': timestamp,
'affected_resources': [job_id]
})
elif state in ['F', 'NF', 'TO']:
current_failures.append({
'type': 'job_failed',
'message': f"Job {job_id} ({job_name}) failed with state {state}",
'timestamp': timestamp,
'affected_resources': [job_id]
})
except Exception as e:
# SLURM might not be available in all environments
pass
# Store failures for reporting
self.failure_indicators.extend(current_failures)
return current_failures
# Usage example
def monitor_job_failures_example():
"""Example monitoring for job scheduling failures"""
monitor = JobSchedulingFailureMonitor()
failures = monitor.monitor_scheduling_failures(duration_minutes=10)
return failures
if __name__ == "__main__":
monitor_job_failures_example()Step 5: Observe Power Utilization Compliance
Observe DPS simulator dashboards for power utilization and successful compliance integration.
Power Utilization Monitoring (Python)
#!/usr/bin/env python3
"""
Monitor DPS Simulator Dashboards for Power Utilization and Compliance
"""
import time
import numpy as np
from typing import Dict, List, Tuple
from dpsapi.api import DpsApi
class PowerUtilizationComplianceMonitor:
def __init__(self):
self.dps_api = (DpsApi("api.dps", 443)
.with_username("compliance-monitor")
.with_password("monitor-password")
.with_insecure_tls_skip_verify(True))
# Compliance thresholds
self.compliance_config = {
'power_variance_threshold': 0.15, # 15% variance allowed
'policy_compliance_threshold': 0.95, # 95% compliance required
'monitoring_interval': 30 # seconds
}
self.compliance_data = []
def monitor_power_utilization_compliance(self, duration_minutes: int = 30):
"""Monitor power utilization and compliance through DPS dashboards"""
print(f"=== Step 5: Monitor Power Utilization Compliance ({duration_minutes} minutes) ===")
end_time = time.time() + (duration_minutes * 60)
sample_count = 0
compliance_violations = []
while time.time() < end_time:
try:
# Collect utilization and compliance data
utilization_data = self._collect_utilization_data()
compliance_status = self._assess_compliance(utilization_data)
sample_count += 1
self.compliance_data.append({
'timestamp': time.time(),
'utilization': utilization_data,
'compliance': compliance_status
})
# Report current status
print(f"\nSample {sample_count} - {time.strftime('%H:%M:%S')}")
self._report_current_status(utilization_data, compliance_status)
# Check for violations
violations = self._identify_violations(compliance_status)
if violations:
compliance_violations.extend(violations)
print("[WARNING] Compliance violations detected:")
for violation in violations:
print(f" - {violation['type']}: {violation['message']}")
time.sleep(self.compliance_config['monitoring_interval'])
except Exception as e:
print(f"Error monitoring compliance: {e}")
time.sleep(self.compliance_config['monitoring_interval'])
# Generate compliance summary
self._generate_compliance_summary(sample_count, compliance_violations)
return self.compliance_data
def _collect_utilization_data(self) -> Dict:
"""Collect power utilization data"""
# Get sample nodes
nodes = ["gb200-r01-0001", "gb200-r01-0002", "gb200-r01-0003", "gb200-r01-0004",
"gb200-r02-0001", "gb200-r02-0002"] # Sample nodes
# Collect GPU metrics
gpu_list = [[node, gpu_id] for node in nodes for gpu_id in range(4)] # DGX_GB200 has 4 GPUs
metrics_response = self.dps_api.request_metrics(requested_gpus=gpu_list)
if 'metrics' not in metrics_response:
raise Exception("No power metrics available")
# Process metrics
node_power = {}
total_power = 0
for metric in metrics_response['metrics']:
node = metric['node']
power = metric['usage']
if node not in node_power:
node_power[node] = {'gpus': [], 'total': 0}
node_power[node]['gpus'].append(power)
node_power[node]['total'] += power
total_power += power
# Calculate utilization statistics
node_powers = [data['total'] for data in node_power.values()]
gpu_powers = [metric['usage'] for metric in metrics_response['metrics']]
utilization_data = {
'total_power': total_power,
'node_count': len(nodes),
'gpu_count': len(gpu_list),
'node_power_distribution': node_power,
'statistics': {
'average_node_power': np.mean(node_powers) if node_powers else 0,
'node_power_std': np.std(node_powers) if len(node_powers) > 1 else 0,
'average_gpu_power': np.mean(gpu_powers) if gpu_powers else 0,
'gpu_power_std': np.std(gpu_powers) if len(gpu_powers) > 1 else 0,
'power_distribution_variance': np.var(node_powers) if len(node_powers) > 1 else 0
}
}
return utilization_data
def _assess_compliance(self, utilization_data: Dict) -> Dict:
"""Assess power utilization compliance against policies"""
compliance_status = {
'overall_compliant': True,
'compliance_score': 1.0,
'assessments': {}
}
# 1. Power Distribution Compliance
stats = utilization_data['statistics']
avg_node_power = stats['average_node_power']
node_power_std = stats['node_power_std']
if avg_node_power > 0:
power_variance = node_power_std / avg_node_power
power_distribution_compliant = power_variance <= self.compliance_config['power_variance_threshold']
else:
power_variance = 0
power_distribution_compliant = True
compliance_status['assessments']['power_distribution'] = {
'compliant': power_distribution_compliant,
'variance': power_variance,
'threshold': self.compliance_config['power_variance_threshold'],
'score': max(0, 1.0 - (power_variance / self.compliance_config['power_variance_threshold']))
}
# 2. Policy Limit Compliance
policy_violations = 0
total_gpus = 0
for node, data in utilization_data['node_power_distribution'].items():
for gpu_power in data['gpus']:
total_gpus += 1
# GB200-High policy: 4800W total / 4 GPUs = 1200W per GPU
if gpu_power > 1250: # 50W tolerance over 1200W per GPU
policy_violations += 1
policy_compliance_rate = (total_gpus - policy_violations) / total_gpus if total_gpus > 0 else 1.0
policy_compliant = policy_compliance_rate >= self.compliance_config['policy_compliance_threshold']
compliance_status['assessments']['policy_limits'] = {
'compliant': policy_compliant,
'compliance_rate': policy_compliance_rate,
'violations': policy_violations,
'total_gpus': total_gpus,
'threshold': self.compliance_config['policy_compliance_threshold']
}
# Calculate overall compliance
assessment_scores = [assessment['score'] for assessment in compliance_status['assessments'].values()
if 'score' in assessment]
compliance_status['compliance_score'] = np.mean(assessment_scores) if assessment_scores else 0
compliance_status['overall_compliant'] = all(
assessment['compliant'] for assessment in compliance_status['assessments'].values()
)
return compliance_status
def _report_current_status(self, utilization_data: Dict, compliance_status: Dict):
"""Report current power utilization and compliance status"""
total_power = utilization_data['total_power']
avg_node_power = utilization_data['statistics']['average_node_power']
compliance_score = compliance_status['compliance_score']
status_icon = "[OK]" if compliance_status['overall_compliant'] else "[WARN]"
print(f" {status_icon} Total Power: {total_power:.1f}W | Avg Node: {avg_node_power:.1f}W | Compliance: {compliance_score:.2f}")
# Report assessment details
for assessment_name, assessment in compliance_status['assessments'].items():
icon = "[OK]" if assessment['compliant'] else "[FAIL]"
if assessment_name == 'power_distribution':
print(f" {icon} Power Distribution: {assessment['variance']:.3f} variance (limit: {assessment['threshold']:.3f})")
elif assessment_name == 'policy_limits':
print(f" {icon} Policy Compliance: {assessment['compliance_rate']:.2f} rate ({assessment['violations']} violations)")
def _identify_violations(self, compliance_status: Dict) -> List[Dict]:
"""Identify specific compliance violations"""
violations = []
for assessment_name, assessment in compliance_status['assessments'].items():
if not assessment['compliant']:
if assessment_name == 'power_distribution':
violations.append({
'type': 'power_distribution_violation',
'message': f"Power distribution variance {assessment['variance']:.3f} exceeds threshold {assessment['threshold']:.3f}"
})
elif assessment_name == 'policy_limits':
violations.append({
'type': 'policy_limit_violation',
'message': f"{assessment['violations']} GPUs exceeding policy limits ({assessment['compliance_rate']:.2f} compliance rate)"
})
return violations
def _generate_compliance_summary(self, sample_count: int, violations: List[Dict]):
"""Generate compliance monitoring summary"""
print(f"\n=== Power Utilization Compliance Summary ===")
print(f"Monitoring Duration: {len(self.compliance_data)} samples over {sample_count} intervals")
if self.compliance_data:
# Calculate summary statistics
compliance_scores = [data['compliance']['compliance_score'] for data in self.compliance_data]
avg_compliance = np.mean(compliance_scores)
min_compliance = np.min(compliance_scores)
total_powers = [data['utilization']['total_power'] for data in self.compliance_data]
avg_power = np.mean(total_powers)
power_trend = "increasing" if total_powers[-1] > total_powers[0] else "decreasing"
print(f"Average Compliance Score: {avg_compliance:.3f}")
print(f"Minimum Compliance Score: {min_compliance:.3f}")
print(f"Average Total Power: {avg_power:.1f}W")
print(f"Power Trend: {power_trend}")
# Violation summary
violation_types = {}
for violation in violations:
violation_type = violation['type']
violation_types[violation_type] = violation_types.get(violation_type, 0) + 1
if violations:
print(f"\nCompliance Violations ({len(violations)} total):")
for violation_type, count in violation_types.items():
print(f" - {violation_type}: {count} occurrences")
else:
print("\n[SUCCESS] No compliance violations detected")
# Final assessment
if avg_compliance >= 0.95 and not violations:
print(f"\n[SUCCESS] Scheduling integration compliance score: {avg_compliance:.2f}")
elif avg_compliance >= 0.85:
print(f"\n[OK] Scheduling integration compliance score: {avg_compliance:.2f}")
elif avg_compliance >= 0.70:
print(f"\n[WARNING] Scheduling integration compliance score: {avg_compliance:.2f} - some issues detected")
else:
print(f"\n[FAILED] Scheduling integration compliance score: {avg_compliance:.2f} - issues detected")
# Usage example
def monitor_compliance_example():
"""Example monitoring for power utilization compliance"""
monitor = PowerUtilizationComplianceMonitor()
compliance_data = monitor.monitor_power_utilization_compliance(duration_minutes=15)
return compliance_data
# Example usage functions
def prolog_integration():
"""Prolog integration with job analysis"""
# Get SLURM job information
job_info = {
'job_id': os.environ.get('SLURM_JOB_ID'),
'name': os.environ.get('SLURM_JOB_NAME', ''),
'partition': os.environ.get('SLURM_JOB_PARTITION', ''),
'qos': os.environ.get('SLURM_JOB_QOS', 'normal'),
'time_limit': os.environ.get('SLURM_TIME_LIMIT', '01:00:00'),
'nodes': os.environ.get('SLURM_JOB_NODELIST', '').split(',')
}
# Initialize integrator
integrator = SLURMDPSIntegrator()
# Analyze job requirements
analysis = integrator.analyze_job_requirements(job_info)
# Create resource group
success = integrator.create_resource_group_for_job(analysis)
if success:
print(f"DPS integration successful for job {job_info['job_id']}")
return 0
else:
print(f"DPS integration failed for job {job_info['job_id']}")
return 1
def epilog_integration():
"""Epilog integration with reporting"""
job_id = os.environ.get('SLURM_JOB_ID')
# Initialize integrator
integrator = SLURMDPSIntegrator()
# Cleanup with reporting
report = integrator.cleanup_with_reporting(job_id)
if report['success']:
print(f"DPS cleanup successful for job {job_id}")
if report['final_metrics']:
print(f"Final power consumption: {report['final_metrics']['total_power']:.1f}W")
else:
print(f"DPS cleanup completed with issues for job {job_id}")
return 0 # Do not fail jobs due to cleanup issues
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO)
if len(sys.argv) > 1:
if sys.argv[1] == "prolog":
sys.exit(prolog_integration())
elif sys.argv[1] == "epilog":
sys.exit(epilog_integration())
print("Usage: python3 slurm_dps_integration.py [prolog|epilog]")
sys.exit(1)Configuration and Setup
SLURM Configuration
Add the following to /etc/slurm/slurm.conf:
# DPS Integration Configuration
Prolog=/usr/local/bin/slurm_dps_prolog.sh
Epilog=/usr/local/bin/slurm_dps_epilog.sh
# Enable prolog/epilog for all partitions
PrologFlags=AllocEnvironment Setup Commands
# Deploy DPS development environment
task dev:up
task dev:deploy
# Install integration scripts
sudo cp slurm_dps_prolog.sh /usr/local/bin/
sudo cp slurm_dps_epilog.sh /usr/local/bin/
sudo chmod +x /usr/local/bin/slurm_dps_*.sh
# Login to DPS
dpsctl --host api.dps --port 443 --insecure-tls-skip-verify login --username <username> --password <password>
# Import your topology
dpsctl --host api.dps --port 443 --insecure-tls-skip-verify topology import --filename <topology-file.json>
dpsctl --host api.dps --port 443 --insecure-tls-skip-verify topology activate --topology <topology-name>
# Restart SLURM to pick up new configuration
sudo systemctl restart slurmctld
sudo systemctl restart slurmdTesting Commands
# Submit test job to verify integration
sbatch --job-name=test-job --partition=dps_simulator --nodes=2 test_job.sh
# Monitor resource groups
dpsctl --host api.dps --port 443 --insecure-tls-skip-verify resource-group list
watch 'dpsctl --host api.dps --port 443 --insecure-tls-skip-verify resource-group list'
# Check power consumption
dpsctl --host api.dps --port 443 --insecure-tls-skip-verify check metrics --node=<node-name>
# View integration logs
tail -f /var/log/slurm/dps_prolog.log
tail -f /var/log/slurm/dps_epilog.logObservable Metrics
Scheduler Integration Performance
Monitor these metrics during scheduler integration:
- Resource Group Creation Time: Time from job submission to DPS resource group activation
- Policy Application Success: Success rate for resource group creation and activation
- Job Startup Overhead: Additional time due to DPS integration
- Cleanup Reliability: Success rate for resource group cleanup in epilog
Power Management
- Policy Compliance: Applied power policies reflected in actual power consumption
- Power Monitoring: Real-time power consumption metrics
- Policy Selection: Power policies selected based on job characteristics
- Resource Utilization: Datacenter power capacity usage across scheduled workloads
Integration Health
- Script Execution: Prolog/epilog script execution success
- API Reliability: API call success during operations
- Monitoring Coverage: Power monitoring data availability for scheduled jobs
- Error Recovery: Handling of DPS service unavailability
This scheduling workload integration provides a solution for workload schedulers to manage datacenter power policies using the DPS simulator environment.