> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.
> For AI client integration (Claude Code, Cursor, etc.), connect to the MCP server at https://docs.nvidia.com/nemo/curator/_mcp/server.

# nemo_curator.core.client

## Module Contents

### Classes

| Name                                                         | Description                                                                    |
| ------------------------------------------------------------ | ------------------------------------------------------------------------------ |
| [`RayClient`](#nemo_curator-core-client-RayClient)           | This class is used to setup the Ray cluster and configure metrics integration. |
| [`SlurmRayClient`](#nemo_curator-core-client-SlurmRayClient) | RayClient extended for multi-node SLURM jobs.                                  |

### Functions

| Name                                                                         | Description                                                    |
| ---------------------------------------------------------------------------- | -------------------------------------------------------------- |
| [`_expand_slurm_nodelist`](#nemo_curator-core-client-_expand_slurm_nodelist) | Expand a SLURM node-list expression into individual hostnames. |
| [`_find_ray_binary`](#nemo_curator-core-client-_find_ray_binary)             | Locate the `ray` CLI in the active Python environment.         |
| [`_parse_slurm_nodelist`](#nemo_curator-core-client-_parse_slurm_nodelist)   | Pure-Python parser for SLURM compact nodelist notation.        |

### API

```python
class nemo_curator.core.client.RayClient(
    ray_port: int = DEFAULT_RAY_PORT,
    ray_dashboard_port: int = DEFAULT_RAY_DASHBOARD_PORT,
    ray_client_server_port: int = DEFAULT_RAY_CLIENT_SERVER_PORT,
    ray_temp_dir: str = DEFAULT_RAY_TEMP_DIR,
    include_dashboard: bool = True,
    ray_metrics_port: int = DEFAULT_RAY_METRICS_PORT,
    ray_dashboard_host: str = DEFAULT_RAY_DASHBOARD_HOST,
    num_gpus: int | None = None,
    num_cpus: int | None = None,
    object_store_memory: int | None = None,
    enable_object_spilling: bool = False,
    ray_stdouterr_capture_file: str | None = None,
    metrics_dir: str | None = None
)
```

Dataclass

This class is used to setup the Ray cluster and configure metrics integration.

If the specified ports are already in use, it will find the next available port and use that.

**Parameters:**

The port number of the Ray GCS.

The port number of the Ray dashboard.

The temporary directory to use for Ray.

Whether to include dashboard integration. If true, adds Ray metrics service discovery.

The port number of the Ray metrics.

The host of the Ray dashboard.

The number of GPUs to use.

The number of CPUs to use.

The amount of memory to use for the object store.

Whether to enable object spilling.

The file to capture stdout/stderr to.

The directory for Prometheus/Grafana metrics data. If None, uses the per-user default.

```python
nemo_curator.core.client.RayClient.__enter__()
```

```python
nemo_curator.core.client.RayClient.__exit__(
    exc = ()
)
```

```python
nemo_curator.core.client.RayClient.__post_init__() -> None
```

```python
nemo_curator.core.client.RayClient.start() -> None
```

Start the Ray cluster if not already started, optionally capturing stdout/stderr to a file.

```python
nemo_curator.core.client.RayClient.stop() -> None
```

```python
class nemo_curator.core.client.SlurmRayClient(
    ray_port: int = DEFAULT_RAY_PORT,
    ray_dashboard_port: int = DEFAULT_RAY_DASHBOARD_PORT,
    ray_client_server_port: int = DEFAULT_RAY_CLIENT_SERVER_PORT,
    ray_temp_dir: str = DEFAULT_RAY_TEMP_DIR,
    include_dashboard: bool = True,
    ray_metrics_port: int = DEFAULT_RAY_METRICS_PORT,
    ray_dashboard_host: str = '0.0.0.0',
    num_gpus: int | None = None,
    num_cpus: int | None = None,
    object_store_memory: int | None = None,
    enable_object_spilling: bool = False,
    ray_stdouterr_capture_file: str | None = None,
    metrics_dir: str | None = None,
    worker_connect_timeout_s: int = 300,
    cleanup_on_start: bool = True
)
```

Dataclass

**Bases:** [RayClient](#nemo_curator-core-client-RayClient)

RayClient extended for multi-node SLURM jobs.

On single-node SLURM jobs (or when not running under SLURM at all),
behaves identically to :class:`RayClient`.

On multi-node jobs, the script must be launched on **every** node
(e.g. via `srun --ntasks-per-node=1`).  Each process calls
`SlurmRayClient`, which inspects `SLURM_NODEID` to determine
its role:

* **Head (SLURM\_NODEID=0)**: starts the Ray head, waits for all
  workers to connect, then returns from :meth:`start` so the
  pipeline can run.
* **Workers (SLURM\_NODEID>0)**: start a Ray worker that connects
  to the head and **block until the cluster is torn down**.  When
  the head stops Ray (after the pipeline finishes), the worker
  process exits cleanly with `sys.exit(0)`.

This is analogous to how `torchrun` works: the same script is
launched on every node and each process discovers its role from the
environment.

Example `sbatch` script::

\#!/bin/bash
\#SBATCH --nodes=4
\#SBATCH --ntasks-per-node=1
\#SBATCH --gpus-per-node=8

srun --ntasks-per-node=1 \
\--container-image=nvcr.io/nvidia/nemo-curator:26.02 \
\--container-mounts="/lustre:/lustre" \
bash -c "source .venv/bin/activate && python my\_pipeline.py"

For bare-metal (no container) setups, the same pattern works::

\#!/bin/bash
\#SBATCH --nodes=4
\#SBATCH --ntasks-per-node=1
\#SBATCH --gpus-per-node=8

srun python my\_pipeline.py

If `RAY_ADDRESS` is set before :meth:`start` is called,
`SlurmRayClient` connects to the existing cluster without
starting or stopping anything.

## Parameters

worker\_connect\_timeout\_s:
Maximum seconds to wait for all worker nodes to join after the
head is up.  Raises `TimeoutError` if exceeded.
cleanup\_on\_start:
If *True*, run `ray stop --force` on the local node before
starting Ray.  Helps clear stale processes from previous runs.

```python
nemo_curator.core.client.SlurmRayClient.__post_init__() -> None
```

```python
nemo_curator.core.client.SlurmRayClient._cleanup_local_ray() -> None
```

Stop any stale Ray processes on the local node.

```python
nemo_curator.core.client.SlurmRayClient._detect_slurm_resources() -> None
```

Auto-detect per-node CPU/GPU counts from SLURM env vars when not set explicitly.

```python
nemo_curator.core.client.SlurmRayClient._head_port_file(
    slurm_job_id: str
) -> str
```

Return path to the shared port-broadcast file for this job.

Must be on a filesystem visible to ALL nodes (Lustre, not /tmp).
Uses env var `RAY_PORT_BROADCAST_DIR` if set, otherwise falls back to
`/tmp` (works on single-node or when /tmp is shared, e.g. via NFS).

```python
nemo_curator.core.client.SlurmRayClient._ray_init_with_timeout(
    address: str,
    timeout_s: int = 120
) -> None
```

staticmethod

Call `ray.init(address=...)` with a SIGALRM-based timeout.

`ray.init` can hang indefinitely if the GCS is slow or unstable
after a multi-job start.  We use SIGALRM (Linux/macOS only) to raise
a `TimeoutError` if the call blocks longer than *timeout\_s* seconds.

Falls back to an unguarded `ray.init` when called from a non-main
thread, where SIGALRM is unavailable.

```python
nemo_curator.core.client.SlurmRayClient._read_head_port(
    slurm_job_id: str,
    timeout_s: int = 600
) -> int
```

Wait for the head to write its port file and return the port number.

```python
nemo_curator.core.client.SlurmRayClient._run_as_worker(
    head_ip: str
) -> int
```

Start a Ray worker that connects to *head\_ip* and block until the cluster is torn down.

Returns the exit code of `ray start --block` so the caller can pass it to `sys.exit`.
Exit code 0 means the cluster was torn down cleanly; non-zero indicates an error.

```python
nemo_curator.core.client.SlurmRayClient._wait_for_workers() -> None
```

Block until every allocated node is alive in the Ray cluster.

Raises `TimeoutError` (after tearing everything down) if not
all nodes join within `worker_connect_timeout_s`.

```python
nemo_curator.core.client.SlurmRayClient._write_head_port(
    slurm_job_id: str
) -> None
```

Write the actual Ray GCS port to a shared file so workers can read it.

Uses an atomic write-then-rename so workers never observe an empty or
partially-written file (important on Lustre / NFS where open() truncates
before write() completes).

```python
nemo_curator.core.client.SlurmRayClient.start() -> None
```

Start the Ray cluster, with role detection on multi-node SLURM jobs.

If `RAY_ADDRESS` is already set, connects to the existing
cluster without starting a new head or launching workers.

On multi-node jobs, worker processes (`SLURM_NODEID &gt; 0`)
block here until the cluster is torn down, then exit with
`sys.exit(0)`.  Only the head (`SLURM_NODEID = 0`) returns
from this method.

```python
nemo_curator.core.client.SlurmRayClient.stop() -> None
```

Stop the Ray head.  Workers detect the head's death and exit on their own.

Safe to call multiple times.  Does not stop an externally
managed cluster (one discovered via `RAY_ADDRESS`).

```python
nemo_curator.core.client._expand_slurm_nodelist(
    nodelist: str
) -> list[str]
```

Expand a SLURM node-list expression into individual hostnames.

Tries `scontrol show hostnames` first, then falls back to a
pure-Python parser that handles common compact formats like
`prefix-[01,03-05]` and `node1,node2`.

```python
nemo_curator.core.client._find_ray_binary() -> str
```

Locate the `ray` CLI in the active Python environment.

```python
nemo_curator.core.client._parse_slurm_nodelist(
    nodelist: str
) -> list[str]
```

Pure-Python parser for SLURM compact nodelist notation.

Handles formats like:

* `node1,node2,node3`
* `prefix-[01,03,05]`
* `prefix-[01-05]`
* `prefix-[01-03,07,10-12]`