nemo_curator.core.serve.placement

View as Markdown

Ray-placement-group construction and bundle operations.

Covers two concerns:

  1. Planning — turning a TP size + cluster topology into a ReplicaBundleSpec (single-node STRICT_PACK or multi-node STRICT_SPREAD with an equal per-node split).
  2. Construction + bundle-scoped operationsbuild_pg / build_replica_pg create detached, named PGs and wait until ready; get_bundle_node_ip / get_free_port_in_bundle discover where a bundle actually landed; remove_named_pgs_with_prefix reaps orphans left by a prior driver session.

Subprocess lifecycle (actors, graceful stop, CUDA/env propagation) lives in subprocess_mgr. Backend-specific PGs (e.g. the Dynamo etcd+NATS+frontend bundle) live in the backend’s own subpackage.

Module Contents

Classes

NameDescription
ReplicaBundleSpecBundle shape + strategy for a single model replica.

Functions

NameDescription
_get_gpu_topologyReturn per-node GPU topology: [{"node_id", "num_gpus", "is_head"}, ...].
_remote_get_free_port-
_remote_get_node_ip-
_run_in_bundleSchedule remote_fn into pg’s bundle bundle_index and return the result.
build_pgCreate a detached, named PG and wait until ready; clean up on failure.
build_replica_pgCreate a detached, named PG for one replica and wait until ready.
get_bundle_node_ipReturn the routable IP of the node hosting pg’s bundle bundle_index.
get_free_port_in_bundleFind a free port on the node hosting pg’s bundle bundle_index.
plan_replica_bundle_shapePick the bundle shape for one replica given current cluster topology.
remove_named_pgs_with_prefixRemove all placement groups in the current namespace whose name starts with prefix.

API

class nemo_curator.core.serve.placement.ReplicaBundleSpec(
bundles: list[dict[str, float]],
strategy: typing.Literal['STRICT_PACK', 'STRICT_SPREAD'],
nnodes: int,
per_node_gpus: int,
bundle_label_selector: list[dict[str, str]] | None = None
)
Dataclass

Bundle shape + strategy for a single model replica.

bundle_label_selector
list[dict[str, str]] | None = None
bundles
list[dict[str, float]]
is_multi_node
bool
nnodes
int
per_node_gpus
int
strategy
Literal['STRICT_PACK', 'STRICT_SPREAD']
total_gpus
int
nemo_curator.core.serve.placement._get_gpu_topology(
head_node_id: str | None = None,
nodes: list[dict[str, typing.Any]] | None = None
) -> list[dict[str, typing.Any]]

Return per-node GPU topology: [{"node_id", "num_gpus", "is_head"}, ...].

Uses total node resources, not current availability — topology shape is a static property of the cluster. Ray’s PG scheduler handles dynamic capacity.

Parameters:

head_node_id
str | NoneDefaults to None

Ray node ID to tag as head in output (for CURATOR_IGNORE_RAY_HEAD_NODE filtering). Defaults to the node bearing the node:__internal_head__ resource marker; falls back to the driver’s own node id if no marker is found (matches the behaviour used by backends/utils.py).

nodes
list[dict[str, Any]] | NoneDefaults to None

Pre-fetched ray.nodes() to avoid a redundant call.

nemo_curator.core.serve.placement._remote_get_free_port(
start: int,
get_next: bool
) -> int
nemo_curator.core.serve.placement._remote_get_node_ip() -> str
nemo_curator.core.serve.placement._run_in_bundle(
pg: ray.util.placement_group.PlacementGroup,
bundle_index: int,
remote_fn: typing.Any,
args: typing.Any = ()
) -> typing.Any

Schedule remote_fn into pg’s bundle bundle_index and return the result.

nemo_curator.core.serve.placement.build_pg(
bundles: list[dict[str, float]],
strategy: str,
name: str,
bundle_label_selector: list[dict[str, str]] | None,
ready_timeout_s: float
) -> ray.util.placement_group.PlacementGroup

Create a detached, named PG and wait until ready; clean up on failure.

nemo_curator.core.serve.placement.build_replica_pg(
spec: nemo_curator.core.serve.placement.ReplicaBundleSpec,
name: str,
ready_timeout_s: float = PLACEMENT_GROUP_READY_TIMEO...
) -> ray.util.placement_group.PlacementGroup

Create a detached, named PG for one replica and wait until ready.

PG is created with lifetime="detached" so it survives driver disconnects between server.start(), pipeline.run(), and server.stop(). The caller-supplied name is used for orphan cleanup via remove_named_pgs_with_prefix.

nemo_curator.core.serve.placement.get_bundle_node_ip(
pg: ray.util.placement_group.PlacementGroup,
bundle_index: int
) -> str

Return the routable IP of the node hosting pg’s bundle bundle_index.

Used to resolve the master-addr for multi-node TP after pg.ready(): the rank-0 actor will schedule into this same bundle, so its peers can connect to this IP.

nemo_curator.core.serve.placement.get_free_port_in_bundle(
pg: ray.util.placement_group.PlacementGroup,
bundle_index: int,
start_port: int,
get_next_free_port: bool = True
) -> int

Find a free port on the node hosting pg’s bundle bundle_index.

The remote task is scheduled into the target bundle via PlacementGroupSchedulingStrategy, so port availability is checked on the same node where the consuming actor will bind.

nemo_curator.core.serve.placement.plan_replica_bundle_shape(
tp_size: int,
head_node_id: str | None = None,
_topology: list[dict[str, typing.Any]] | None = None,
_nodes: list[dict[str, typing.Any]] | None = None
) -> nemo_curator.core.serve.placement.ReplicaBundleSpec

Pick the bundle shape for one replica given current cluster topology.

Single-node: if any node has >= tp_size GPUs, return one bundle of size tp_size with STRICT_PACK.

Multi-node: find the smallest nnodes such that tp_size % nnodes == 0 and at least nnodes nodes have >= tp_size / nnodes GPUs each. Return nnodes equal bundles with STRICT_SPREAD. vLLM requires an even per-node split (1+3 for TP=4 fails with a CUDA device ordinal error), so asymmetric splits are never considered.

When CURATOR_IGNORE_RAY_HEAD_NODE is set, the head node is filtered out of topology and every bundle gets [{"ray.io/node-type": "worker"}] as a label selector.

nemo_curator.core.serve.placement.remove_named_pgs_with_prefix(
prefix: str
) -> int

Remove all placement groups in the current namespace whose name starts with prefix.

Requires a live Ray connection on the current driver. Intended for orphan cleanup after a driver restart: since PGs are namespace-scoped and named, a reconnecting driver (with matching namespace=) can find and reap leftover state from a prior session. Removing a PG forcibly kills all actors scheduled into it, releasing the reserved resources.

Returns the number of PGs removed.