nemo_curator.core.client
nemo_curator.core.client
Module Contents
Classes
Functions
API
This class is used to setup the Ray cluster and configure metrics integration.
If the specified ports are already in use, it will find the next available port and use that.
Parameters:
The port number of the Ray GCS.
The port number of the Ray dashboard.
The temporary directory to use for Ray.
Whether to include dashboard integration. If true, adds Ray metrics service discovery.
The port number of the Ray metrics.
The host of the Ray dashboard.
The number of GPUs to use.
The number of CPUs to use.
The amount of memory to use for the object store.
Whether to enable object spilling.
The file to capture stdout/stderr to.
The directory for Prometheus/Grafana metrics data. If None, uses the per-user default.
Start the Ray cluster if not already started, optionally capturing stdout/stderr to a file.
Bases: RayClient
RayClient extended for multi-node SLURM jobs.
On single-node SLURM jobs (or when not running under SLURM at all),
behaves identically to :class:RayClient.
On multi-node jobs, the script must be launched on every node
(e.g. via srun --ntasks-per-node=1). Each process calls
SlurmRayClient, which inspects SLURM_NODEID to determine
its role:
- Head (SLURM_NODEID=0): starts the Ray head, waits for all
workers to connect, then returns from :meth:
startso the pipeline can run. - Workers (SLURM_NODEID>0): start a Ray worker that connects
to the head and block until the cluster is torn down. When
the head stops Ray (after the pipeline finishes), the worker
process exits cleanly with
sys.exit(0).
This is analogous to how torchrun works: the same script is
launched on every node and each process discovers its role from the
environment.
Example sbatch script::
#!/bin/bash #SBATCH —nodes=4 #SBATCH —ntasks-per-node=1 #SBATCH —gpus-per-node=8
srun —ntasks-per-node=1
—container-image=nvcr.io/nvidia/nemo-curator:26.02
—container-mounts=“/lustre:/lustre”
bash -c “source .venv/bin/activate && python my_pipeline.py”
For bare-metal (no container) setups, the same pattern works::
#!/bin/bash #SBATCH —nodes=4 #SBATCH —ntasks-per-node=1 #SBATCH —gpus-per-node=8
srun python my_pipeline.py
If RAY_ADDRESS is set before :meth:start is called,
SlurmRayClient connects to the existing cluster without
starting or stopping anything.
Parameters
worker_connect_timeout_s:
Maximum seconds to wait for all worker nodes to join after the
head is up. Raises TimeoutError if exceeded.
cleanup_on_start:
If True, run ray stop --force on the local node before
starting Ray. Helps clear stale processes from previous runs.
Stop any stale Ray processes on the local node.
Auto-detect per-node CPU/GPU counts from SLURM env vars when not set explicitly.
Return path to the shared port-broadcast file for this job.
Must be on a filesystem visible to ALL nodes (Lustre, not /tmp).
Uses env var RAY_PORT_BROADCAST_DIR if set, otherwise falls back to
/tmp (works on single-node or when /tmp is shared, e.g. via NFS).
Call ray.init(address=...) with a SIGALRM-based timeout.
ray.init can hang indefinitely if the GCS is slow or unstable
after a multi-job start. We use SIGALRM (Linux/macOS only) to raise
a TimeoutError if the call blocks longer than timeout_s seconds.
Falls back to an unguarded ray.init when called from a non-main
thread, where SIGALRM is unavailable.
Wait for the head to write its port file and return the port number.
Start a Ray worker that connects to head_ip and block until the cluster is torn down.
Returns the exit code of ray start --block so the caller can pass it to sys.exit.
Exit code 0 means the cluster was torn down cleanly; non-zero indicates an error.
Block until every allocated node is alive in the Ray cluster.
Raises TimeoutError (after tearing everything down) if not
all nodes join within worker_connect_timeout_s.
Write the actual Ray GCS port to a shared file so workers can read it.
Uses an atomic write-then-rename so workers never observe an empty or partially-written file (important on Lustre / NFS where open() truncates before write() completes).
Start the Ray cluster, with role detection on multi-node SLURM jobs.
If RAY_ADDRESS is already set, connects to the existing
cluster without starting a new head or launching workers.
On multi-node jobs, worker processes (SLURM_NODEID > 0)
block here until the cluster is torn down, then exit with
sys.exit(0). Only the head (SLURM_NODEID = 0) returns
from this method.
Stop the Ray head. Workers detect the head’s death and exit on their own.
Safe to call multiple times. Does not stop an externally
managed cluster (one discovered via RAY_ADDRESS).
Expand a SLURM node-list expression into individual hostnames.
Tries scontrol show hostnames first, then falls back to a
pure-Python parser that handles common compact formats like
prefix-[01,03-05] and node1,node2.
Locate the ray CLI in the active Python environment.
Pure-Python parser for SLURM compact nodelist notation.
Handles formats like:
node1,node2,node3prefix-[01,03,05]prefix-[01-05]prefix-[01-03,07,10-12]