> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.

# nemo_curator.backends.utils

## Module Contents

### Classes

| Name                                                                | Description                                                              |
| ------------------------------------------------------------------- | ------------------------------------------------------------------------ |
| [`RayStageSpecKeys`](#nemo_curator-backends-utils-RayStageSpecKeys) | String enum of different flags that define keys inside ray\_stage\_spec. |

### Functions

| Name                                                                                              | Description                                                                |
| ------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------- |
| [`_logger_custom_deserializer`](#nemo_curator-backends-utils-_logger_custom_deserializer)         | -                                                                          |
| [`_logger_custom_serializer`](#nemo_curator-backends-utils-_logger_custom_serializer)             | -                                                                          |
| [`_setup_stage_on_node`](#nemo_curator-backends-utils-_setup_stage_on_node)                       | Ray remote function to execute setup\_on\_node for a stage.                |
| [`check_total_gpu_capacity`](#nemo_curator-backends-utils-check_total_gpu_capacity)               | Raise if the cluster doesn't have enough GPUs to satisfy aggregate demand. |
| [`execute_setup_on_node`](#nemo_curator-backends-utils-execute_setup_on_node)                     | Execute `setup_on_node` for every stage on every alive Ray node.           |
| [`get_available_cpu_gpu_resources`](#nemo_curator-backends-utils-get_available_cpu_gpu_resources) | Get available CPU and GPU resources from Ray.                              |
| [`get_worker_metadata_and_node_id`](#nemo_curator-backends-utils-get_worker_metadata_and_node_id) | Get the worker metadata and node id from the runtime context.              |
| [`merge_executor_configs`](#nemo_curator-backends-utils-merge_executor_configs)                   | Recursively merge two executor configs with deep merging of nested dicts.  |
| [`register_loguru_serializer`](#nemo_curator-backends-utils-register_loguru_serializer)           | Initialize a new local Ray cluster or connects to an existing one.         |
| [`warn_on_env_var_override`](#nemo_curator-backends-utils-warn_on_env_var_override)               | -                                                                          |

### API

<Anchor id="nemo_curator-backends-utils-RayStageSpecKeys">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    class nemo_curator.backends.utils.RayStageSpecKeys
    ```
  </CodeBlock>
</Anchor>

<Indent>
  **Bases:** `enum.Enum`

  String enum of different flags that define keys inside ray\_stage\_spec.

  <ParamField path="IS_ACTOR_STAGE" type="= 'is_actor_stage'" />

  <ParamField path="IS_FANOUT_STAGE" type="= 'is_fanout_stage'" />

  <ParamField path="IS_LSH_STAGE" type="= 'is_lsh_stage'" />

  <ParamField path="IS_RAFT_ACTOR" type="= 'is_raft_actor'" />

  <ParamField path="IS_SHUFFLE_STAGE" type="= 'is_shuffle_stage'" />

  <ParamField path="MAX_CALLS_PER_WORKER" type="= 'max_calls_per_worker'" />

  <ParamField path="RAY_REMOTE_ARGS" type="= 'ray_remote_args'" />
</Indent>

<Anchor id="nemo_curator-backends-utils-_logger_custom_deserializer">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils._logger_custom_deserializer(
        _: None
    ) -> loguru.Logger
    ```
  </CodeBlock>
</Anchor>

<Indent />

<Anchor id="nemo_curator-backends-utils-_logger_custom_serializer">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils._logger_custom_serializer(
        _: loguru.Logger
    ) -> None
    ```
  </CodeBlock>
</Anchor>

<Indent />

<Anchor id="nemo_curator-backends-utils-_setup_stage_on_node">
  <CodeBlock links={{"nemo_curator.stages.base.ProcessingStage":"/nemo-curator/nemo_curator/stages/base#nemo_curator-stages-base-ProcessingStage"}} showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils._setup_stage_on_node(
        stage: nemo_curator.stages.base.ProcessingStage
    ) -> None
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Ray remote function to execute setup\_on\_node for a stage.

  This runs as a Ray remote task (not an actor).
  vLLM's auto-detection only forces the spawn multiprocessing method inside Ray actors,
  not in Ray tasks. Without this override, vLLM defaults to fork in tasks and hits
  RuntimeError: Cannot re-initialize CUDA in forked subprocess.
  We explicitly set the environment variable to spawn to prevent this.
</Indent>

<Anchor id="nemo_curator-backends-utils-check_total_gpu_capacity">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils.check_total_gpu_capacity(
        gpus_needed: int,
        ignore_head_node: bool = False
    ) -> None
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Raise if the cluster doesn't have enough GPUs to satisfy aggregate demand.

  Intended as a coarse pre-check before submitting placement groups: Ray's
  PG scheduler can hang indefinitely on `pg.ready()` when demand exceeds
  capacity, so a fast, explicit error with the actual numbers is friendlier
  than waiting on a timeout.
</Indent>

<Anchor id="nemo_curator-backends-utils-execute_setup_on_node">
  <CodeBlock links={{"nemo_curator.stages.base.ProcessingStage":"/nemo-curator/nemo_curator/stages/base#nemo_curator-stages-base-ProcessingStage"}} showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils.execute_setup_on_node(
        stages: list[nemo_curator.stages.base.ProcessingStage],
        ignore_head_node: bool = False
    ) -> None
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Execute `setup_on_node` for every stage on every alive Ray node.

  All `(stage, node)` setup tasks are submitted up front and awaited with a single
  `ray.get`, so total wall-clock time is bounded by the slowest stage rather than
  the sum of per-stage times — important when setup is heavy (model downloads, weight
  loads) and stages don't contend for the same resources.
</Indent>

<Anchor id="nemo_curator-backends-utils-get_available_cpu_gpu_resources">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils.get_available_cpu_gpu_resources(
        init_and_shutdown: bool = False,
        ignore_head_node: bool = False
    ) -> tuple[int, int]
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Get available CPU and GPU resources from Ray.
</Indent>

<Anchor id="nemo_curator-backends-utils-get_worker_metadata_and_node_id">
  <CodeBlock links={{"nemo_curator.backends.base.NodeInfo":"/nemo-curator/nemo_curator/backends/base#nemo_curator-backends-base-NodeInfo","nemo_curator.backends.base.WorkerMetadata":"/nemo-curator/nemo_curator/backends/base#nemo_curator-backends-base-WorkerMetadata"}} showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils.get_worker_metadata_and_node_id() -> tuple[nemo_curator.backends.base.NodeInfo, nemo_curator.backends.base.WorkerMetadata]
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Get the worker metadata and node id from the runtime context.
</Indent>

<Anchor id="nemo_curator-backends-utils-merge_executor_configs">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils.merge_executor_configs(
        base_config: dict | None,
        override_config: dict | None
    ) -> dict
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Recursively merge two executor configs with deep merging of nested dicts.

  **Parameters:**

  <ParamField path="base_config" type="dict | None">
    Base configuration dictionary
  </ParamField>

  <ParamField path="override_config" type="dict | None">
    Configuration to merge on top of base\_config
  </ParamField>

  **Returns:** `dict`

  Merged configuration dictionary with all nested dicts recursively merged

  **Examples:**

  <CodeBlock showLineNumbers={false}>
    ```python
    >>> base = {"runtime_env": {"env_vars": {"A": "1", "B": "2"}}}
    >>> override = {"runtime_env": {"env_vars": {"B": "3", "C": "4"}}}
    >>> merge_executor_configs(base, override)
    {"runtime_env": {"env_vars": {"A": "1", "B": "3", "C": "4"}}}
    ```
  </CodeBlock>
</Indent>

<Anchor id="nemo_curator-backends-utils-register_loguru_serializer">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils.register_loguru_serializer() -> None
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Initialize a new local Ray cluster or connects to an existing one.
</Indent>

<Anchor id="nemo_curator-backends-utils-warn_on_env_var_override">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.backends.utils.warn_on_env_var_override(
        existing_config: dict | None,
        merged_config: dict | None
    ) -> None
    ```
  </CodeBlock>
</Anchor>

<Indent />