nemo_curator.backends.experimental.utils

View as Markdown

Module Contents

Classes

NameDescription
RayStageSpecKeysString enum of different flags that define keys inside ray_stage_spec.

Functions

NameDescription
_setup_stage_on_nodeRay remote function to execute setup_on_node for a stage.
execute_setup_on_nodeExecute setup on node for a stage.
get_available_cpu_gpu_resourcesGet available CPU and GPU resources from Ray.
get_head_node_idGet the head node ID from the Ray cluster, with lazy evaluation and caching.
get_worker_metadata_and_node_idGet the worker metadata and node id from the runtime context.
is_head_nodeCheck if a node is the head node.

Data

_HEAD_NODE_ID_CACHE

API

class nemo_curator.backends.experimental.utils.RayStageSpecKeys

Bases: enum.Enum

String enum of different flags that define keys inside ray_stage_spec.

IS_ACTOR_STAGE
= 'is_actor_stage'
IS_FANOUT_STAGE
= 'is_fanout_stage'
IS_LSH_STAGE
= 'is_lsh_stage'
IS_RAFT_ACTOR
= 'is_raft_actor'
IS_SHUFFLE_STAGE
= 'is_shuffle_stage'
MAX_CALLS_PER_WORKER
= 'max_calls_per_worker'
nemo_curator.backends.experimental.utils._setup_stage_on_node(
stage: nemo_curator.stages.base.ProcessingStage,
node_info: nemo_curator.backends.base.NodeInfo,
worker_metadata: nemo_curator.backends.base.WorkerMetadata
) -> None

Ray remote function to execute setup_on_node for a stage.

This runs as a Ray remote task (not an actor). vLLM’s auto-detection only forces the spawn multiprocessing method inside Ray actors, not in Ray tasks. Without this override, vLLM defaults to fork in tasks and hits RuntimeError: Cannot re-initialize CUDA in forked subprocess. We explicitly set the environment variable to spawn to prevent this.

nemo_curator.backends.experimental.utils.execute_setup_on_node(
stages: list[nemo_curator.stages.base.ProcessingStage],
ignore_head_node: bool = False
) -> None

Execute setup on node for a stage.

nemo_curator.backends.experimental.utils.get_available_cpu_gpu_resources(
init_and_shutdown: bool = False,
ignore_head_node: bool = False
) -> tuple[int, int]

Get available CPU and GPU resources from Ray.

nemo_curator.backends.experimental.utils.get_head_node_id() -> str | None

Get the head node ID from the Ray cluster, with lazy evaluation and caching.

Returns: str | None

The head node ID if a head node exists, otherwise None.

nemo_curator.backends.experimental.utils.get_worker_metadata_and_node_id() -> tuple[nemo_curator.backends.base.NodeInfo, nemo_curator.backends.base.WorkerMetadata]nemo_curator.backends.experimental.utils.get_worker_metadata_and_node_id() -> tuple[nemo_curator.backends.base.NodeInfo, nemo_curator.backends.base.WorkerMetadata]

Get the worker metadata and node id from the runtime context.

nemo_curator.backends.experimental.utils.is_head_node(
node: dict[str, typing.Any]
) -> bool

Check if a node is the head node.

nemo_curator.backends.experimental.utils._HEAD_NODE_ID_CACHE = None