nemo_curator.backends.utils

View as Markdown

Module Contents

Classes

NameDescription
RayStageSpecKeysString enum of different flags that define keys inside ray_stage_spec.

Functions

NameDescription
_logger_custom_deserializer-
_logger_custom_serializer-
_setup_stage_on_nodeRay remote function to execute setup_on_node for a stage.
check_total_gpu_capacityRaise if the cluster doesn’t have enough GPUs to satisfy aggregate demand.
execute_setup_on_nodeExecute setup_on_node for every stage on every alive Ray node.
get_available_cpu_gpu_resourcesGet available CPU and GPU resources from Ray.
get_worker_metadata_and_node_idGet the worker metadata and node id from the runtime context.
merge_executor_configsRecursively merge two executor configs with deep merging of nested dicts.
register_loguru_serializerInitialize a new local Ray cluster or connects to an existing one.
warn_on_env_var_override-

API

class nemo_curator.backends.utils.RayStageSpecKeys

Bases: enum.Enum

String enum of different flags that define keys inside ray_stage_spec.

IS_ACTOR_STAGE
= 'is_actor_stage'
IS_FANOUT_STAGE
= 'is_fanout_stage'
IS_LSH_STAGE
= 'is_lsh_stage'
IS_RAFT_ACTOR
= 'is_raft_actor'
IS_SHUFFLE_STAGE
= 'is_shuffle_stage'
MAX_CALLS_PER_WORKER
= 'max_calls_per_worker'
RAY_REMOTE_ARGS
= 'ray_remote_args'
nemo_curator.backends.utils._logger_custom_deserializer(
_: None
) -> loguru.Logger
nemo_curator.backends.utils._logger_custom_serializer(
_: loguru.Logger
) -> None
nemo_curator.backends.utils._setup_stage_on_node(
stage: nemo_curator.stages.base.ProcessingStage
) -> None

Ray remote function to execute setup_on_node for a stage.

This runs as a Ray remote task (not an actor). vLLM’s auto-detection only forces the spawn multiprocessing method inside Ray actors, not in Ray tasks. Without this override, vLLM defaults to fork in tasks and hits RuntimeError: Cannot re-initialize CUDA in forked subprocess. We explicitly set the environment variable to spawn to prevent this.

nemo_curator.backends.utils.check_total_gpu_capacity(
gpus_needed: int,
ignore_head_node: bool = False
) -> None

Raise if the cluster doesn’t have enough GPUs to satisfy aggregate demand.

Intended as a coarse pre-check before submitting placement groups: Ray’s PG scheduler can hang indefinitely on pg.ready() when demand exceeds capacity, so a fast, explicit error with the actual numbers is friendlier than waiting on a timeout.

nemo_curator.backends.utils.execute_setup_on_node(
stages: list[nemo_curator.stages.base.ProcessingStage],
ignore_head_node: bool = False
) -> None

Execute setup_on_node for every stage on every alive Ray node.

All (stage, node) setup tasks are submitted up front and awaited with a single ray.get, so total wall-clock time is bounded by the slowest stage rather than the sum of per-stage times — important when setup is heavy (model downloads, weight loads) and stages don’t contend for the same resources.

nemo_curator.backends.utils.get_available_cpu_gpu_resources(
init_and_shutdown: bool = False,
ignore_head_node: bool = False
) -> tuple[int, int]

Get available CPU and GPU resources from Ray.

nemo_curator.backends.utils.get_worker_metadata_and_node_id() -> tuple[nemo_curator.backends.base.NodeInfo, nemo_curator.backends.base.WorkerMetadata]nemo_curator.backends.utils.get_worker_metadata_and_node_id() -> tuple[nemo_curator.backends.base.NodeInfo, nemo_curator.backends.base.WorkerMetadata]

Get the worker metadata and node id from the runtime context.

nemo_curator.backends.utils.merge_executor_configs(
base_config: dict | None,
override_config: dict | None
) -> dict

Recursively merge two executor configs with deep merging of nested dicts.

Parameters:

base_config
dict | None

Base configuration dictionary

override_config
dict | None

Configuration to merge on top of base_config

Returns: dict

Merged configuration dictionary with all nested dicts recursively merged

Examples:

>>> base = {"runtime_env": {"env_vars": {"A": "1", "B": "2"}}}
>>> override = {"runtime_env": {"env_vars": {"B": "3", "C": "4"}}}
>>> merge_executor_configs(base, override)
{"runtime_env": {"env_vars": {"A": "1", "B": "3", "C": "4"}}}
nemo_curator.backends.utils.register_loguru_serializer() -> None

Initialize a new local Ray cluster or connects to an existing one.

nemo_curator.backends.utils.warn_on_env_var_override(
existing_config: dict | None,
merged_config: dict | None
) -> None