Distributed training with MPI
MPI(Message Passing Interface) is a standard for parallel computing. It is a message passing library that allows for the communication and coordination of processes in a distributed environment.
Lepton supports MPI for distributed training. Here is an example for running a distributed MPI job with 2 workers on Lepton.
Prepare the Python script for distributed training
As an example, this script implements distributed training of a convolutional neural network (CNN) on the MNIST dataset using PyTorch's DistributedDataParallel (DDP) to leverage multiple GPUs in parallel.
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributed as dist
import os
from torchvision import datasets, transforms
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
from datasets import load_dataset
class MNISTModel(nn.Module):
def __init__(self):
super(MNISTModel, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train():
# Initialize process group
dist.init_process_group(backend="nccl")
# Get local rank from environment variable
local_rank = int(os.environ["LOCAL_RANK"])
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
# Set device
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
print(f"Running on rank {rank} (local_rank: {local_rank})")
def transform(example):
imgs = [transforms.ToTensor()(img) for img in example["image"]]
imgs = [transforms.Normalize((0.1307,), (0.3081,))(img) for img in imgs]
example["image"] = torch.stack(imgs)
example["label"] = torch.tensor(example["label"])
return example
dataset = load_dataset("mnist", split="train")
dataset = dataset.with_transform(transform)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
train_loader = DataLoader(dataset, batch_size=64, sampler=sampler)
model = MNISTModel().to(device)
model = DDP(model, device_ids=[local_rank])
optimizer = optim.Adam(model.parameters(), lr=0.001)
model.train()
for epoch in range(1, 11):
sampler.set_epoch(epoch)
for batch_idx, batch_data in enumerate(train_loader):
data, target = batch_data["image"].to(device), batch_data["label"].to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 10 == 0:
print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")
if rank == 0:
torch.save(model.module.state_dict(), "mnist_model.pth")
print("Model saved as mnist_model.pth")
dist.destroy_process_group()
if __name__ == "__main__":
train()
The file has been saved at the Github Repo here.
Create Job through Dashboard
Head over to the Batch Jobs page, and follow the steps below to create a job.
Set up the job
Resource
In the resource section, first, you can select which node group do you want to use.
Select the resource type you want to use, for example, gpu.8xh100-sxm
, and set the number of workers to the desired number.
In this guide, we want to use 2 replicas, so we set the number of workers to 2.
Container
In the container section, use the default image and paste the following command as the start command to run the job:
export DEBIAN_FRONTEND=noninteractive
export DEBIAN_PRIORITY=critical
apt-get update && apt-get install -y libibverbs-dev infiniband-diags openmpi-bin openmpi-doc libopenmpi-dev net-tools openssh-server openssh-client
# Setup SSH
cat << EOF > /etc/ssh/sshd_config.d/lep.conf
PermitRootLogin yes
PubkeyAuthentication yes
Port 2222
StrictModes no
EOF
cat << EOF > /etc/ssh/ssh_config.d/lep.conf
Port 2222
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
EOF
# prepare ssh key for root user, the following key is an example key as a placeholder, you can generate your own key and replace the key here.
mkdir -p /root/.ssh
echo 'ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJJwTM8O5HYIHTFq6Lq7VfVnNySmy8m78tnf0W6nW7gm' >> /root/.ssh/authorized_keys
cat <<EOT > /root/.ssh/id_ed25519
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACCScEzPDuR2CB0xaui6u1X1ZzckpsvJu/LZ39Fup1u4JgAAAJhb0FvcW9Bb
3AAAAAtzc2gtZWQyNTUxOQAAACCScEzPDuR2CB0xaui6u1X1ZzckpsvJu/LZ39Fup1u4Jg
AAAEDT9ez2cl7eUmiGsJZdCfdBvZAUru8THA1KqLjXsPiPUpJwTM8O5HYIHTFq6Lq7VfVn
NySmy8m78tnf0W6nW7gmAAAAE2ppbmd4dXpAamluZ3h1ei1tbHQBAg==
-----END OPENSSH PRIVATE KEY-----
EOT
chmod 700 /root/.ssh
chmod 600 /root/.ssh/*
service ssh restart
########## setup ##############
cat << 'EOF' > run.sh
cd /workspace
git clone https://github.com/leptonai/examples.git
cd examples/advanced/pytorch-example
source /opt/lepton/venv/bin/activate
pip install datasets
# Run the distributed training script.
torchrun \
--nnodes=$OMPI_COMM_WORLD_SIZE \
--nproc_per_node=$(nvidia-smi --query-gpu=gpu_name --format=csv,noheader | wc -l) \
--node_rank=$OMPI_COMM_WORLD_RANK \
--master_addr=$MASTER_ADDR \
--master_port=$MASTER_PORT \
main.py
EOF
########## end setup ##########
# we assume rank 0 is master, and other workers will wait for rank 0 to finish
COMPLETE_FILE="/tmp/lepton-mpi-cmplete"
if [[ $LEPTON_JOB_WORKER_INDEX -ne 0 ]]; then
while true; do
[ ! -f "${COMPLETE_FILE}" ] || break
sleep 5
done
exit 0
fi
# continue setup the environment
SERVICE_PREFIX="${LEPTON_JOB_SERVICE_PREFIX:-$LEPTON_JOB_NAME}"
SUBDOMAIN="${LEPTON_SUBDOMAIN:-$LEPTON_JOB_NAME-job-svc}"
export MASTER_ADDR=${SERVICE_PREFIX}-0.${SUBDOMAIN}
export HOST_ADDRS=$(seq 1 $((LEPTON_JOB_TOTAL_WORKERS - 1)) | xargs -I {} echo ${SERVICE_PREFIX}-{}.${SUBDOMAIN} | paste -sd ',' -)
HOSTFILE=/tmp/hostfile.txt
rm -f $HOSTFILE
HOST_IPS=()
######### make sure all workers are ready
for i in $(seq 0 $((LEPTON_JOB_TOTAL_WORKERS - 1))); do
NODE_NAME=${SERVICE_PREFIX}-$i.${SUBDOMAIN}
NODE_IP=""
while [ -z "$NODE_IP" ]; do
NODE_IP=$(getent hosts -- $NODE_NAME | awk '{ print $1 }' || echo "")
if [ -z "$NODE_IP" ]; then
sleep 5
fi
done
WAIT_RETRY=60
while ! ssh $NODE_IP -- echo ok 2>&1; do
echo "waiting for server ping ..."
WAIT_RETRY=$((WAIT_RETRY-1))
if [ $WAIT_RETRY -eq 0 ]; then
echo "timed out waiting host $NODE_IP to be ready"
exit 1
fi
sleep 5
echo "retry ssh to $NODE_IP"
done
if [ i == 0 ]; then
export MASTER_IP=$NODE_IP
fi
HOST_IPS+=($NODE_IP)
echo $NODE_IP >> $HOSTFILE
done
IFS=','; HOST_IPS="${HOST_IPS[*]}"; unset IFS
########## run ##############
mpirun --map-by ppr:1:node -hostfile $HOSTFILE --allow-run-as-root \
-x MASTER_ADDR=$MASTER_ADDR \
-x MASTER_PORT=6500 \
bash run.sh
########## end run ##########
err_exit=$?
# notify other workers that the job is done
mpirun --map-by ppr:8:node -hostfile $HOSTFILE --allow-run-as-root touch ${COMPLETE_FILE}
if [ $err_exit -ne 0 ]; then
echo "MPI job failure!"
exit 1
else
echo "MPI job completed!"
fi
Create and Monitoring
Now you can click on the Create button to create and run the job. After that, you can go to check the job logs or details to monitor the job.
Within the job details page, you can see the status of each worker and the logs of each worker. You can also use Web Terminal to connect to the worker node and check the status of the worker as well. Once the job is finished, you can see the job with a "Completed" state.