nemo_curator.utils.ray_utils

View as Markdown

Cluster-wide Ray helpers shared across backends and inference-server code.

Module Contents

Functions

NameDescription
get_head_node_idReturn the cluster head node ID, lazily computed and cached.
is_head_nodeCheck if a Ray node dict represents the cluster head.
run_on_each_nodeSubmit remote_fn(*args) once per alive Ray node and return results in submission order.
submit_on_each_nodeSubmit remote_fn(*args) once per alive Ray node and return the ObjectRefs.

Data

_HEAD_NODE_ID_CACHE

API

nemo_curator.utils.ray_utils.get_head_node_id() -> str | None

Return the cluster head node ID, lazily computed and cached.

Returns None if no head node is present in the cluster.

nemo_curator.utils.ray_utils.is_head_node(
node: dict[str, typing.Any]
) -> bool

Check if a Ray node dict represents the cluster head.

nemo_curator.utils.ray_utils.run_on_each_node(
remote_fn: ray.remote_function.RemoteFunction,
args = (),
ignore_head_node: bool = False,
num_cpus: float = 0,
num_gpus: float = 0
) -> list[typing.Any]

Submit remote_fn(*args) once per alive Ray node and return results in submission order.

Convenience wrapper that submits via :func:submit_on_each_node and awaits the refs with a single ray.get. For fan-outs across multiple submissions where parallelism matters, call :func:submit_on_each_node directly and ray.get the combined ref list once.

nemo_curator.utils.ray_utils.submit_on_each_node(
remote_fn: ray.remote_function.RemoteFunction,
args = (),
ignore_head_node: bool = False,
num_cpus: float = 0,
num_gpus: float = 0
) -> list[typing.Any]

Submit remote_fn(*args) once per alive Ray node and return the ObjectRefs.

Each invocation is pinned to its node via NodeAffinitySchedulingStrategy(soft=False), so the function runs on (and only on) the targeted node. Dead nodes are skipped; the head node is also skipped when ignore_head_node is True. The caller is responsible for awaiting the returned refs (typically via ray.get); use this when batching multiple fan-outs into a single await preserves parallelism.

nemo_curator.utils.ray_utils._HEAD_NODE_ID_CACHE: str | None = None