nemo_curator.core.client

View as Markdown

Module Contents

Classes

NameDescription
RayClientThis class is used to setup the Ray cluster and configure metrics integration.
SlurmRayClientRayClient extended for multi-node SLURM jobs.

Functions

NameDescription
_expand_slurm_nodelistExpand a SLURM node-list expression into individual hostnames.
_find_ray_binaryLocate the ray CLI in the active Python environment.
_parse_slurm_nodelistPure-Python parser for SLURM compact nodelist notation.

API

class nemo_curator.core.client.RayClient(
ray_port: int = DEFAULT_RAY_PORT,
ray_dashboard_port: int = DEFAULT_RAY_DASHBOARD_PORT,
ray_client_server_port: int = DEFAULT_RAY_CLIENT_SERVER_PORT,
ray_temp_dir: str = DEFAULT_RAY_TEMP_DIR,
include_dashboard: bool = True,
ray_metrics_port: int = DEFAULT_RAY_METRICS_PORT,
ray_dashboard_host: str = DEFAULT_RAY_DASHBOARD_HOST,
num_gpus: int | None = None,
num_cpus: int | None = None,
object_store_memory: int | None = None,
enable_object_spilling: bool = False,
ray_stdouterr_capture_file: str | None = None,
metrics_dir: str | None = None
)
Dataclass

This class is used to setup the Ray cluster and configure metrics integration.

If the specified ports are already in use, it will find the next available port and use that.

Parameters:

ray_port
intDefaults to DEFAULT_RAY_PORT

The port number of the Ray GCS.

ray_dashboard_port
intDefaults to DEFAULT_RAY_DASHBOARD_PORT

The port number of the Ray dashboard.

ray_temp_dir
strDefaults to DEFAULT_RAY_TEMP_DIR

The temporary directory to use for Ray.

include_dashboard
boolDefaults to True

Whether to include dashboard integration. If true, adds Ray metrics service discovery.

ray_metrics_port
intDefaults to DEFAULT_RAY_METRICS_PORT

The port number of the Ray metrics.

ray_dashboard_host
strDefaults to DEFAULT_RAY_DASHBOARD_HOST

The host of the Ray dashboard.

num_gpus
int | NoneDefaults to None

The number of GPUs to use.

num_cpus
int | NoneDefaults to None

The number of CPUs to use.

object_store_memory
int | NoneDefaults to None

The amount of memory to use for the object store.

enable_object_spilling
boolDefaults to False

Whether to enable object spilling.

ray_stdouterr_capture_file
str | NoneDefaults to None

The file to capture stdout/stderr to.

metrics_dir
str | NoneDefaults to None

The directory for Prometheus/Grafana metrics data. If None, uses the per-user default.

enable_object_spilling
bool = False
include_dashboard
bool = True
metrics_dir
str | None = None
num_cpus
int | None = None
num_gpus
int | None = None
object_store_memory
int | None = None
ray_client_server_port
int = DEFAULT_RAY_CLIENT_SERVER_PORT
ray_dashboard_host
str = DEFAULT_RAY_DASHBOARD_HOST
ray_dashboard_port
int = DEFAULT_RAY_DASHBOARD_PORT
ray_metrics_port
int = DEFAULT_RAY_METRICS_PORT
ray_port
int = DEFAULT_RAY_PORT
ray_process
Popen | None = field(init=False, default=None)
ray_stdouterr_capture_file
str | None = None
ray_temp_dir
str = DEFAULT_RAY_TEMP_DIR
nemo_curator.core.client.RayClient.__enter__()
nemo_curator.core.client.RayClient.__exit__(
exc = ()
)
nemo_curator.core.client.RayClient.__post_init__() -> None
nemo_curator.core.client.RayClient.start() -> None

Start the Ray cluster if not already started, optionally capturing stdout/stderr to a file.

nemo_curator.core.client.RayClient.stop() -> None
class nemo_curator.core.client.SlurmRayClient(
ray_port: int = DEFAULT_RAY_PORT,
ray_dashboard_port: int = DEFAULT_RAY_DASHBOARD_PORT,
ray_client_server_port: int = DEFAULT_RAY_CLIENT_SERVER_PORT,
ray_temp_dir: str = DEFAULT_RAY_TEMP_DIR,
include_dashboard: bool = True,
ray_metrics_port: int = DEFAULT_RAY_METRICS_PORT,
ray_dashboard_host: str = '0.0.0.0',
num_gpus: int | None = None,
num_cpus: int | None = None,
object_store_memory: int | None = None,
enable_object_spilling: bool = False,
ray_stdouterr_capture_file: str | None = None,
metrics_dir: str | None = None,
worker_connect_timeout_s: int = 300,
cleanup_on_start: bool = True
)
Dataclass

Bases: RayClient

RayClient extended for multi-node SLURM jobs.

On single-node SLURM jobs (or when not running under SLURM at all), behaves identically to :class:RayClient.

On multi-node jobs, the script must be launched on every node (e.g. via srun --ntasks-per-node=1). Each process calls SlurmRayClient, which inspects SLURM_NODEID to determine its role:

  • Head (SLURM_NODEID=0): starts the Ray head, waits for all workers to connect, then returns from :meth:start so the pipeline can run.
  • Workers (SLURM_NODEID>0): start a Ray worker that connects to the head and block until the cluster is torn down. When the head stops Ray (after the pipeline finishes), the worker process exits cleanly with sys.exit(0).

This is analogous to how torchrun works: the same script is launched on every node and each process discovers its role from the environment.

Example sbatch script::

#!/bin/bash #SBATCH —nodes=4 #SBATCH —ntasks-per-node=1 #SBATCH —gpus-per-node=8

srun —ntasks-per-node=1
—container-image=nvcr.io/nvidia/nemo-curator:26.02
—container-mounts=“/lustre:/lustre”
bash -c “source .venv/bin/activate && python my_pipeline.py”

For bare-metal (no container) setups, the same pattern works::

#!/bin/bash #SBATCH —nodes=4 #SBATCH —ntasks-per-node=1 #SBATCH —gpus-per-node=8

srun python my_pipeline.py

If RAY_ADDRESS is set before :meth:start is called, SlurmRayClient connects to the existing cluster without starting or stopping anything.

Parameters

worker_connect_timeout_s: Maximum seconds to wait for all worker nodes to join after the head is up. Raises TimeoutError if exceeded. cleanup_on_start: If True, run ray stop --force on the local node before starting Ray. Helps clear stale processes from previous runs.

_manages_cluster
bool = field(init=False, default=False, repr=False)
_slurm_nodes
list[str]
cleanup_on_start
bool = True
ray_dashboard_host
str = '0.0.0.0'
worker_connect_timeout_s
int = 300
nemo_curator.core.client.SlurmRayClient.__post_init__() -> None
nemo_curator.core.client.SlurmRayClient._cleanup_local_ray() -> None

Stop any stale Ray processes on the local node.

nemo_curator.core.client.SlurmRayClient._detect_slurm_resources() -> None

Auto-detect per-node CPU/GPU counts from SLURM env vars when not set explicitly.

nemo_curator.core.client.SlurmRayClient._head_port_file(
slurm_job_id: str
) -> str

Return path to the shared port-broadcast file for this job.

Must be on a filesystem visible to ALL nodes (Lustre, not /tmp). Uses env var RAY_PORT_BROADCAST_DIR if set, otherwise falls back to /tmp (works on single-node or when /tmp is shared, e.g. via NFS).

nemo_curator.core.client.SlurmRayClient._ray_init_with_timeout(
address: str,
timeout_s: int = 120
) -> None
staticmethod

Call ray.init(address=...) with a SIGALRM-based timeout.

ray.init can hang indefinitely if the GCS is slow or unstable after a multi-job start. We use SIGALRM (Linux/macOS only) to raise a TimeoutError if the call blocks longer than timeout_s seconds.

Falls back to an unguarded ray.init when called from a non-main thread, where SIGALRM is unavailable.

nemo_curator.core.client.SlurmRayClient._read_head_port(
slurm_job_id: str,
timeout_s: int = 600
) -> int

Wait for the head to write its port file and return the port number.

nemo_curator.core.client.SlurmRayClient._run_as_worker(
head_ip: str
) -> int

Start a Ray worker that connects to head_ip and block until the cluster is torn down.

Returns the exit code of ray start --block so the caller can pass it to sys.exit. Exit code 0 means the cluster was torn down cleanly; non-zero indicates an error.

nemo_curator.core.client.SlurmRayClient._wait_for_workers() -> None

Block until every allocated node is alive in the Ray cluster.

Raises TimeoutError (after tearing everything down) if not all nodes join within worker_connect_timeout_s.

nemo_curator.core.client.SlurmRayClient._write_head_port(
slurm_job_id: str
) -> None

Write the actual Ray GCS port to a shared file so workers can read it.

Uses an atomic write-then-rename so workers never observe an empty or partially-written file (important on Lustre / NFS where open() truncates before write() completes).

nemo_curator.core.client.SlurmRayClient.start() -> None

Start the Ray cluster, with role detection on multi-node SLURM jobs.

If RAY_ADDRESS is already set, connects to the existing cluster without starting a new head or launching workers.

On multi-node jobs, worker processes (SLURM_NODEID > 0) block here until the cluster is torn down, then exit with sys.exit(0). Only the head (SLURM_NODEID = 0) returns from this method.

nemo_curator.core.client.SlurmRayClient.stop() -> None

Stop the Ray head. Workers detect the head’s death and exit on their own.

Safe to call multiple times. Does not stop an externally managed cluster (one discovered via RAY_ADDRESS).

nemo_curator.core.client._expand_slurm_nodelist(
nodelist: str
) -> list[str]

Expand a SLURM node-list expression into individual hostnames.

Tries scontrol show hostnames first, then falls back to a pure-Python parser that handles common compact formats like prefix-[01,03-05] and node1,node2.

nemo_curator.core.client._find_ray_binary() -> str

Locate the ray CLI in the active Python environment.

nemo_curator.core.client._parse_slurm_nodelist(
nodelist: str
) -> list[str]

Pure-Python parser for SLURM compact nodelist notation.

Handles formats like:

  • node1,node2,node3
  • prefix-[01,03,05]
  • prefix-[01-05]
  • prefix-[01-03,07,10-12]