Run Ray Job on DGX Cloud Lepton

Running a Ray job on DGX Cloud Lepton requires two main steps:

  1. Configure a Ray cluster environment to combine multiple nodes
  2. Submit tasks to the Ray cluster for execution

Currently, DGX Cloud Lepton doesn't natively support Ray cluster jobs but we can manually initialize a cluster to achieve the same effect.

Prerequisites

  • A DGX Cloud Lepton workspace with available node group to create workload
  • S3 credentials for distributed storage (for checkpoint persistence between nodes)
  • Basic understanding of Ray distributed computing framework

Steps

1. Set Up S3 Credentials

To use S3 storage, you need to provide the AWS_S3_KEY, AWS_S3_SECRET, and AWS_S3_ENDPOINT you can use Secrets to store them on the platform. Refer to this guide for more details.

2. Create and Upload Ray Training Script

This example training script uses Ray to train a BERT model on the Yelp Review dataset:

import os
import pyarrow

import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
    Trainer,
    TrainingArguments,
    AutoTokenizer,
    AutoModelForSequenceClassification,
)

import ray.train.huggingface.transformers
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

# Initialize Ray cluster
ray.init(address="localhost:6379")

# [1] Encapsulate data preprocessing, training, and evaluation logic
def train_func():
    # Datasets
    dataset = load_dataset("yelp_review_full")
    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)

    small_train_dataset = (
        dataset["train"].select(range(100)).map(tokenize_function, batched=True)
    )
    small_eval_dataset = (
        dataset["test"].select(range(100)).map(tokenize_function, batched=True)
    )

    # Model
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-cased", num_labels=5
    )

    # Evaluation Metrics
    metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    # Hugging Face Trainer
    training_args = TrainingArguments(
        output_dir="test_trainer",
        eval_strategy="epoch",
        save_strategy="epoch",
        report_to="none",
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=small_train_dataset,
        eval_dataset=small_eval_dataset,
        compute_metrics=compute_metrics,
    )

    # [2] Report Metrics and Checkpoints to Ray Train
    callback = ray.train.huggingface.transformers.RayTrainReportCallback()
    trainer.add_callback(callback)

    # [3] Prepare Transformers Trainer
    trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)

    # Start Training
    trainer.train()

# [4] Define a Ray TorchTrainer to launch `train_func` on all workers
ray_trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
    # [4a] For multi-node clusters, configure persistent storage accessible across all worker nodes
    run_config=ray.train.RunConfig(
        storage_path="data/ray_bert",
        storage_filesystem=pyarrow.fs.S3FileSystem(
            access_key=os.environ['AWS_S3_KEY'],
            secret_key=os.environ['AWS_S3_SECRET'],
            endpoint_override=os.environ['AWS_S3_ENDPOINT'],
        )
    )
)
result = ray_trainer.fit()

# [5] Load the trained model
with result.checkpoint.as_directory() as checkpoint_dir:
    checkpoint_path = os.path.join(
        checkpoint_dir,
        ray.train.huggingface.transformers.RayTrainReportCallback.CHECKPOINT_NAME,
    )
    model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)

Assume that you manage your scripts in a Git repository, you can upload the script to the repository for later use.

3. Create Ray Cluster Startup Script

Create a startup script to initialize the Ray cluster. This script handles node configuration, environment setup, and Ray initialization:

export DEBIAN_FRONTEND=noninteractive
export DEBIAN_PRIORITY=critical
apt-get update && apt-get install -y libibverbs-dev infiniband-diags net-tools

# Set common environment variables
SERVICE_PREFIX="${LEPTON_JOB_SERVICE_PREFIX:-$LEPTON_JOB_NAME}"
SUBDOMAIN="${LEPTON_SUBDOMAIN:-$LEPTON_JOB_NAME-job-svc}"
export MASTER_ADDR=${SERVICE_PREFIX}-0.${SUBDOMAIN}
export THIS_ADDR=${SERVICE_PREFIX}-${LEPTON_JOB_WORKER_INDEX}.${SUBDOMAIN}
export WORLD_SIZE=${LEPTON_JOB_TOTAL_WORKERS}
export WORLD_RANK=${LEPTON_JOB_WORKER_INDEX}
export NGPUS=${LEPTON_RESOURCE_ACCELERATOR_NUM}

# Get master IP
NODE_IP=""
while [ -z "$NODE_IP" ]; do
    NODE_IP=$(getent hosts -- $MASTER_ADDR | awk '{ print $1 }' || echo "")
    if [ -z "$NODE_IP" ]; then
        sleep 5
    fi
done
export MASTER_IP=$NODE_IP

# Adjust environment variables
if [ ${NGPUS} != 8 ]; then
    # There are no IB devices for this resource shape, need to unset NCCL_SOCKET_IFNAME
    export NCCL_SOCKET_IFNAME=
fi

# Install required packages
pip install "ray[train]" torch "accelerate==1.6.0" "transformers[torch]==4.51.3" datasets evaluate numpy scikit-learn

# Initialize Ray cluster
if [ ${WORLD_RANK} == 0 ]; then
    ray start --head --port=6379
else
    ray start --address="${MASTER_IP}:6379" --block
fi

# Allow time for all nodes to join the cluster
sleep 30

# Clone your repository
git clone https://github.com/your-username/your-repo.git
cd your-repo

if [ ${WORLD_RANK} == 0 ]; then
python ray_bert.py
fi

ray stop
Note

Environment variables with LEPTON_ prefix are automatically set by the Lepton platform.

4. Create Ray Job

Navigate to the create job page, you can see the job configuration form. Refer to create job guide for more details.

  • Job name: We can set it to ray-job.
  • Resource: We need to use H100 GPUs, select H100-80GB-HBM3 x1 and set worker count to 2.
  • Container image: The default image is fine as lepton:photon-py3.11-runner-0.21.0.
  • Run command: Copy the Startup script we created in the previous step to the run command field.
  • Environment variables: Expand the "Advanced configuration" section, add the following environment variables to the job with "Add Secret" button.
    • AWS_S3_KEY: The S3 key for the S3 credentials.
    • AWS_S3_SECRET: The S3 secret for the S3 credentials.
    • AWS_S3_ENDPOINT: The S3 endpoint for the S3 credentials.

Click on Create to submit the job, and you can see the job status in the job detail page.

Note

You need to have a node group with H100 GPUs to create this job. Refer to node group guide for more details.

5. Important Considerations

  • Adjust the num_workers in the ScalingConfig based on your resource requirements. For example, setting num_workers=2 means the job will use 2 GPUs.
  • S3 storage is not the only option for distributed storage. You can choose any distributed storage solution to persistently save checkpoints and other data between nodes.
  • The example code is adapted from the Ray documentation.

Copyright @ 2025, NVIDIA.