> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.

# nemo_curator.core.client

## Module Contents

### Classes

| Name                                                         | Description                                                                    |
| ------------------------------------------------------------ | ------------------------------------------------------------------------------ |
| [`RayClient`](#nemo_curator-core-client-RayClient)           | This class is used to setup the Ray cluster and configure metrics integration. |
| [`SlurmRayClient`](#nemo_curator-core-client-SlurmRayClient) | RayClient extended for multi-node SLURM jobs.                                  |

### Functions

| Name                                                                         | Description                                                    |
| ---------------------------------------------------------------------------- | -------------------------------------------------------------- |
| [`_expand_slurm_nodelist`](#nemo_curator-core-client-_expand_slurm_nodelist) | Expand a SLURM node-list expression into individual hostnames. |
| [`_find_ray_binary`](#nemo_curator-core-client-_find_ray_binary)             | Locate the `ray` CLI in the active Python environment.         |
| [`_parse_slurm_nodelist`](#nemo_curator-core-client-_parse_slurm_nodelist)   | Pure-Python parser for SLURM compact nodelist notation.        |

### API

<Anchor id="nemo_curator-core-client-RayClient">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    class nemo_curator.core.client.RayClient(
        ray_port: int = DEFAULT_RAY_PORT,
        ray_dashboard_port: int = DEFAULT_RAY_DASHBOARD_PORT,
        ray_client_server_port: int = DEFAULT_RAY_CLIENT_SERVER_PORT,
        ray_temp_dir: str = DEFAULT_RAY_TEMP_DIR,
        include_dashboard: bool = True,
        ray_metrics_port: int = DEFAULT_RAY_METRICS_PORT,
        ray_dashboard_host: str = DEFAULT_RAY_DASHBOARD_HOST,
        num_gpus: int | None = None,
        num_cpus: int | None = None,
        object_store_memory: int | None = None,
        enable_object_spilling: bool = False,
        ray_stdouterr_capture_file: str | None = None,
        metrics_dir: str | None = None
    )
    ```
  </CodeBlock>
</Anchor>

<Indent>
  <Badge>
    Dataclass
  </Badge>

  This class is used to setup the Ray cluster and configure metrics integration.

  If the specified ports are already in use, it will find the next available port and use that.

  **Parameters:**

  <ParamField path="ray_port" type="int" default="DEFAULT_RAY_PORT">
    The port number of the Ray GCS.
  </ParamField>

  <ParamField path="ray_dashboard_port" type="int" default="DEFAULT_RAY_DASHBOARD_PORT">
    The port number of the Ray dashboard.
  </ParamField>

  <ParamField path="ray_temp_dir" type="str" default="DEFAULT_RAY_TEMP_DIR">
    The temporary directory to use for Ray.
  </ParamField>

  <ParamField path="include_dashboard" type="bool" default="True">
    Whether to include dashboard integration. If true, adds Ray metrics service discovery.
  </ParamField>

  <ParamField path="ray_metrics_port" type="int" default="DEFAULT_RAY_METRICS_PORT">
    The port number of the Ray metrics.
  </ParamField>

  <ParamField path="ray_dashboard_host" type="str" default="DEFAULT_RAY_DASHBOARD_HOST">
    The host of the Ray dashboard.
  </ParamField>

  <ParamField path="num_gpus" type="int | None" default="None">
    The number of GPUs to use.
  </ParamField>

  <ParamField path="num_cpus" type="int | None" default="None">
    The number of CPUs to use.
  </ParamField>

  <ParamField path="object_store_memory" type="int | None" default="None">
    The amount of memory to use for the object store.
  </ParamField>

  <ParamField path="enable_object_spilling" type="bool" default="False">
    Whether to enable object spilling.
  </ParamField>

  <ParamField path="ray_stdouterr_capture_file" type="str | None" default="None">
    The file to capture stdout/stderr to.
  </ParamField>

  <ParamField path="metrics_dir" type="str | None" default="None">
    The directory for Prometheus/Grafana metrics data. If None, uses the per-user default.
  </ParamField>

  <ParamField path="enable_object_spilling" type="bool = False" />

  <ParamField path="include_dashboard" type="bool = True" />

  <ParamField path="metrics_dir" type="str | None = None" />

  <ParamField path="num_cpus" type="int | None = None" />

  <ParamField path="num_gpus" type="int | None = None" />

  <ParamField path="object_store_memory" type="int | None = None" />

  <ParamField path="ray_client_server_port" type="int = DEFAULT_RAY_CLIENT_SERVER_PORT" />

  <ParamField path="ray_dashboard_host" type="str = DEFAULT_RAY_DASHBOARD_HOST" />

  <ParamField path="ray_dashboard_port" type="int = DEFAULT_RAY_DASHBOARD_PORT" />

  <ParamField path="ray_metrics_port" type="int = DEFAULT_RAY_METRICS_PORT" />

  <ParamField path="ray_port" type="int = DEFAULT_RAY_PORT" />

  <ParamField path="ray_process" type="Popen | None = field(init=False, default=None)" />

  <ParamField path="ray_stdouterr_capture_file" type="str | None = None" />

  <ParamField path="ray_temp_dir" type="str = DEFAULT_RAY_TEMP_DIR" />

  <Anchor id="nemo_curator-core-client-RayClient-__enter__">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.RayClient.__enter__()
      ```
    </CodeBlock>
  </Anchor>

  <Indent />

  <Anchor id="nemo_curator-core-client-RayClient-__exit__">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.RayClient.__exit__(
          exc = ()
      )
      ```
    </CodeBlock>
  </Anchor>

  <Indent />

  <Anchor id="nemo_curator-core-client-RayClient-__post_init__">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.RayClient.__post_init__() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent />

  <Anchor id="nemo_curator-core-client-RayClient-start">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.RayClient.start() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Start the Ray cluster if not already started, optionally capturing stdout/stderr to a file.
  </Indent>

  <Anchor id="nemo_curator-core-client-RayClient-stop">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.RayClient.stop() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent />
</Indent>

<Anchor id="nemo_curator-core-client-SlurmRayClient">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    class nemo_curator.core.client.SlurmRayClient(
        ray_port: int = DEFAULT_RAY_PORT,
        ray_dashboard_port: int = DEFAULT_RAY_DASHBOARD_PORT,
        ray_client_server_port: int = DEFAULT_RAY_CLIENT_SERVER_PORT,
        ray_temp_dir: str = DEFAULT_RAY_TEMP_DIR,
        include_dashboard: bool = True,
        ray_metrics_port: int = DEFAULT_RAY_METRICS_PORT,
        ray_dashboard_host: str = '0.0.0.0',
        num_gpus: int | None = None,
        num_cpus: int | None = None,
        object_store_memory: int | None = None,
        enable_object_spilling: bool = False,
        ray_stdouterr_capture_file: str | None = None,
        metrics_dir: str | None = None,
        worker_connect_timeout_s: int = 300,
        cleanup_on_start: bool = True
    )
    ```
  </CodeBlock>
</Anchor>

<Indent>
  <Badge>
    Dataclass
  </Badge>

  **Bases:** [RayClient](#nemo_curator-core-client-RayClient)

  RayClient extended for multi-node SLURM jobs.

  On single-node SLURM jobs (or when not running under SLURM at all),
  behaves identically to :class:`RayClient`.

  On multi-node jobs, the script must be launched on **every** node
  (e.g. via `srun --ntasks-per-node=1`).  Each process calls
  `SlurmRayClient`, which inspects `SLURM_NODEID` to determine
  its role:

  * **Head (SLURM\_NODEID=0)**: starts the Ray head, waits for all
    workers to connect, then returns from :meth:`start` so the
    pipeline can run.
  * **Workers (SLURM\_NODEID>0)**: start a Ray worker that connects
    to the head and **block until the cluster is torn down**.  When
    the head stops Ray (after the pipeline finishes), the worker
    process exits cleanly with `sys.exit(0)`.

  This is analogous to how `torchrun` works: the same script is
  launched on every node and each process discovers its role from the
  environment.

  Example `sbatch` script::

  \#!/bin/bash
  \#SBATCH --nodes=4
  \#SBATCH --ntasks-per-node=1
  \#SBATCH --gpus-per-node=8

  srun --ntasks-per-node=1 \
  \--container-image=nvcr.io/nvidia/nemo-curator:26.02 \
  \--container-mounts="/lustre:/lustre" \
  bash -c "source .venv/bin/activate && python my\_pipeline.py"

  For bare-metal (no container) setups, the same pattern works::

  \#!/bin/bash
  \#SBATCH --nodes=4
  \#SBATCH --ntasks-per-node=1
  \#SBATCH --gpus-per-node=8

  srun python my\_pipeline.py

  If `RAY_ADDRESS` is set before :meth:`start` is called,
  `SlurmRayClient` connects to the existing cluster without
  starting or stopping anything.

  ## Parameters

  worker\_connect\_timeout\_s:
  Maximum seconds to wait for all worker nodes to join after the
  head is up.  Raises `TimeoutError` if exceeded.
  cleanup\_on\_start:
  If *True*, run `ray stop --force` on the local node before
  starting Ray.  Helps clear stale processes from previous runs.

  <ParamField path="_manages_cluster" type="bool = field(init=False, default=False, repr=False)" />

  <ParamField path="_slurm_nodes" type="list[str]" />

  <ParamField path="cleanup_on_start" type="bool = True" />

  <ParamField path="ray_dashboard_host" type="str = '0.0.0.0'" />

  <ParamField path="worker_connect_timeout_s" type="int = 300" />

  <Anchor id="nemo_curator-core-client-SlurmRayClient-__post_init__">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient.__post_init__() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent />

  <Anchor id="nemo_curator-core-client-SlurmRayClient-_cleanup_local_ray">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient._cleanup_local_ray() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Stop any stale Ray processes on the local node.
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-_detect_slurm_resources">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient._detect_slurm_resources() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Auto-detect per-node CPU/GPU counts from SLURM env vars when not set explicitly.
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-_head_port_file">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient._head_port_file(
          slurm_job_id: str
      ) -> str
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Return path to the shared port-broadcast file for this job.

    Must be on a filesystem visible to ALL nodes (Lustre, not /tmp).
    Uses env var `RAY_PORT_BROADCAST_DIR` if set, otherwise falls back to
    `/tmp` (works on single-node or when /tmp is shared, e.g. via NFS).
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-_ray_init_with_timeout">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient._ray_init_with_timeout(
          address: str,
          timeout_s: int = 120
      ) -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    <Badge>
      staticmethod
    </Badge>

    Call `ray.init(address=...)` with a SIGALRM-based timeout.

    `ray.init` can hang indefinitely if the GCS is slow or unstable
    after a multi-job start.  We use SIGALRM (Linux/macOS only) to raise
    a `TimeoutError` if the call blocks longer than *timeout\_s* seconds.

    Falls back to an unguarded `ray.init` when called from a non-main
    thread, where SIGALRM is unavailable.
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-_read_head_port">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient._read_head_port(
          slurm_job_id: str,
          timeout_s: int = 600
      ) -> int
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Wait for the head to write its port file and return the port number.
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-_run_as_worker">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient._run_as_worker(
          head_ip: str
      ) -> int
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Start a Ray worker that connects to *head\_ip* and block until the cluster is torn down.

    Returns the exit code of `ray start --block` so the caller can pass it to `sys.exit`.
    Exit code 0 means the cluster was torn down cleanly; non-zero indicates an error.
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-_wait_for_workers">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient._wait_for_workers() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Block until every allocated node is alive in the Ray cluster.

    Raises `TimeoutError` (after tearing everything down) if not
    all nodes join within `worker_connect_timeout_s`.
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-_write_head_port">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient._write_head_port(
          slurm_job_id: str
      ) -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Write the actual Ray GCS port to a shared file so workers can read it.

    Uses an atomic write-then-rename so workers never observe an empty or
    partially-written file (important on Lustre / NFS where open() truncates
    before write() completes).
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-start">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient.start() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Start the Ray cluster, with role detection on multi-node SLURM jobs.

    If `RAY_ADDRESS` is already set, connects to the existing
    cluster without starting a new head or launching workers.

    On multi-node jobs, worker processes (`SLURM_NODEID &gt; 0`)
    block here until the cluster is torn down, then exit with
    `sys.exit(0)`.  Only the head (`SLURM_NODEID = 0`) returns
    from this method.
  </Indent>

  <Anchor id="nemo_curator-core-client-SlurmRayClient-stop">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.core.client.SlurmRayClient.stop() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Stop the Ray head.  Workers detect the head's death and exit on their own.

    Safe to call multiple times.  Does not stop an externally
    managed cluster (one discovered via `RAY_ADDRESS`).
  </Indent>
</Indent>

<Anchor id="nemo_curator-core-client-_expand_slurm_nodelist">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.client._expand_slurm_nodelist(
        nodelist: str
    ) -> list[str]
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Expand a SLURM node-list expression into individual hostnames.

  Tries `scontrol show hostnames` first, then falls back to a
  pure-Python parser that handles common compact formats like
  `prefix-[01,03-05]` and `node1,node2`.
</Indent>

<Anchor id="nemo_curator-core-client-_find_ray_binary">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.client._find_ray_binary() -> str
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Locate the `ray` CLI in the active Python environment.
</Indent>

<Anchor id="nemo_curator-core-client-_parse_slurm_nodelist">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.client._parse_slurm_nodelist(
        nodelist: str
    ) -> list[str]
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Pure-Python parser for SLURM compact nodelist notation.

  Handles formats like:

  * `node1,node2,node3`
  * `prefix-[01,03,05]`
  * `prefix-[01-05]`
  * `prefix-[01-03,07,10-12]`
</Indent>