> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.

# nemo_curator.core.serve.placement

Ray-placement-group construction and bundle operations.

Covers two concerns:

1. **Planning** -- turning a TP size + cluster topology into a
   `ReplicaBundleSpec` (single-node `STRICT_PACK` or multi-node
   `STRICT_SPREAD` with an equal per-node split).
2. **Construction + bundle-scoped operations** -- `build_pg` /
   `build_replica_pg` create detached, named PGs and wait until ready;
   `get_bundle_node_ip` / `get_free_port_in_bundle` discover
   where a bundle actually landed; `remove_named_pgs_with_prefix`
   reaps orphans left by a prior driver session.

Subprocess lifecycle (actors, graceful stop, CUDA/env propagation)
lives in `subprocess_mgr`. Backend-specific PGs (e.g. the Dynamo
etcd+NATS+frontend bundle) live in the backend's own subpackage.

## Module Contents

### Classes

| Name                                                                        | Description                                         |
| --------------------------------------------------------------------------- | --------------------------------------------------- |
| [`ReplicaBundleSpec`](#nemo_curator-core-serve-placement-ReplicaBundleSpec) | Bundle shape + strategy for a single model replica. |

### Functions

| Name                                                                                              | Description                                                                           |
| ------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------- |
| [`_get_gpu_topology`](#nemo_curator-core-serve-placement-_get_gpu_topology)                       | Return per-node GPU topology: `[&#123;"node_id", "num_gpus", "is_head"&#125;, ...]`.  |
| [`_remote_get_free_port`](#nemo_curator-core-serve-placement-_remote_get_free_port)               | -                                                                                     |
| [`_remote_get_node_ip`](#nemo_curator-core-serve-placement-_remote_get_node_ip)                   | -                                                                                     |
| [`_run_in_bundle`](#nemo_curator-core-serve-placement-_run_in_bundle)                             | Schedule *remote\_fn* into `pg`'s bundle *bundle\_index* and return the result.       |
| [`build_pg`](#nemo_curator-core-serve-placement-build_pg)                                         | Create a detached, named PG and wait until ready; clean up on failure.                |
| [`build_replica_pg`](#nemo_curator-core-serve-placement-build_replica_pg)                         | Create a detached, named PG for one replica and wait until ready.                     |
| [`get_bundle_node_ip`](#nemo_curator-core-serve-placement-get_bundle_node_ip)                     | Return the routable IP of the node hosting `pg`'s bundle *bundle\_index*.             |
| [`get_free_port_in_bundle`](#nemo_curator-core-serve-placement-get_free_port_in_bundle)           | Find a free port on the node hosting `pg`'s bundle *bundle\_index*.                   |
| [`plan_replica_bundle_shape`](#nemo_curator-core-serve-placement-plan_replica_bundle_shape)       | Pick the bundle shape for one replica given current cluster topology.                 |
| [`remove_named_pgs_with_prefix`](#nemo_curator-core-serve-placement-remove_named_pgs_with_prefix) | Remove all placement groups in the current namespace whose name starts with *prefix*. |

### API

<Anchor id="nemo_curator-core-serve-placement-ReplicaBundleSpec">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    class nemo_curator.core.serve.placement.ReplicaBundleSpec(
        bundles: list[dict[str, float]],
        strategy: typing.Literal['STRICT_PACK', 'STRICT_SPREAD'],
        nnodes: int,
        per_node_gpus: int,
        bundle_label_selector: list[dict[str, str]] | None = None
    )
    ```
  </CodeBlock>
</Anchor>

<Indent>
  <Badge>
    Dataclass
  </Badge>

  Bundle shape + strategy for a single model replica.

  <ParamField path="bundle_label_selector" type="list[dict[str, str]] | None = None" />

  <ParamField path="bundles" type="list[dict[str, float]]" />

  <ParamField path="is_multi_node" type="bool" />

  <ParamField path="nnodes" type="int" />

  <ParamField path="per_node_gpus" type="int" />

  <ParamField path="strategy" type="Literal['STRICT_PACK', 'STRICT_SPREAD']" />

  <ParamField path="total_gpus" type="int" />
</Indent>

<Anchor id="nemo_curator-core-serve-placement-_get_gpu_topology">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement._get_gpu_topology(
        head_node_id: str | None = None,
        nodes: list[dict[str, typing.Any]] | None = None
    ) -> list[dict[str, typing.Any]]
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Return per-node GPU topology: `[&#123;"node_id", "num_gpus", "is_head"&#125;, ...]`.

  Uses total node resources, not current availability -- topology shape is a
  static property of the cluster. Ray's PG scheduler handles dynamic capacity.

  **Parameters:**

  <ParamField path="head_node_id" type="str | None" default="None">
    Ray node ID to tag as head in output (for
    `CURATOR_IGNORE_RAY_HEAD_NODE` filtering). Defaults to the
    node bearing the `node:__internal_head__` resource marker;
    falls back to the driver's own node id if no marker is found
    (matches the behaviour used by `backends/utils.py`).
  </ParamField>

  <ParamField path="nodes" type="list[dict[str, Any]] | None" default="None">
    Pre-fetched `ray.nodes()` to avoid a redundant call.
  </ParamField>
</Indent>

<Anchor id="nemo_curator-core-serve-placement-_remote_get_free_port">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement._remote_get_free_port(
        start: int,
        get_next: bool
    ) -> int
    ```
  </CodeBlock>
</Anchor>

<Indent />

<Anchor id="nemo_curator-core-serve-placement-_remote_get_node_ip">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement._remote_get_node_ip() -> str
    ```
  </CodeBlock>
</Anchor>

<Indent />

<Anchor id="nemo_curator-core-serve-placement-_run_in_bundle">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement._run_in_bundle(
        pg: ray.util.placement_group.PlacementGroup,
        bundle_index: int,
        remote_fn: typing.Any,
        args: typing.Any = ()
    ) -> typing.Any
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Schedule *remote\_fn* into `pg`'s bundle *bundle\_index* and return the result.
</Indent>

<Anchor id="nemo_curator-core-serve-placement-build_pg">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement.build_pg(
        bundles: list[dict[str, float]],
        strategy: str,
        name: str,
        bundle_label_selector: list[dict[str, str]] | None,
        ready_timeout_s: float
    ) -> ray.util.placement_group.PlacementGroup
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Create a detached, named PG and wait until ready; clean up on failure.
</Indent>

<Anchor id="nemo_curator-core-serve-placement-build_replica_pg">
  <CodeBlock links={{"nemo_curator.core.serve.placement.ReplicaBundleSpec":"#nemo_curator-core-serve-placement-ReplicaBundleSpec"}} showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement.build_replica_pg(
        spec: nemo_curator.core.serve.placement.ReplicaBundleSpec,
        name: str,
        ready_timeout_s: float = PLACEMENT_GROUP_READY_TIMEO...
    ) -> ray.util.placement_group.PlacementGroup
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Create a detached, named PG for one replica and wait until ready.

  PG is created with `lifetime="detached"` so it survives driver
  disconnects between `server.start()`, `pipeline.run()`, and
  `server.stop()`. The caller-supplied `name` is used for orphan
  cleanup via `remove_named_pgs_with_prefix`.
</Indent>

<Anchor id="nemo_curator-core-serve-placement-get_bundle_node_ip">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement.get_bundle_node_ip(
        pg: ray.util.placement_group.PlacementGroup,
        bundle_index: int
    ) -> str
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Return the routable IP of the node hosting `pg`'s bundle *bundle\_index*.

  Used to resolve the master-addr for multi-node TP after `pg.ready()`:
  the rank-0 actor will schedule into this same bundle, so its peers can
  connect to this IP.
</Indent>

<Anchor id="nemo_curator-core-serve-placement-get_free_port_in_bundle">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement.get_free_port_in_bundle(
        pg: ray.util.placement_group.PlacementGroup,
        bundle_index: int,
        start_port: int,
        get_next_free_port: bool = True
    ) -> int
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Find a free port on the node hosting `pg`'s bundle *bundle\_index*.

  The remote task is scheduled into the target bundle via
  `PlacementGroupSchedulingStrategy`, so port availability is checked on
  the same node where the consuming actor will bind.
</Indent>

<Anchor id="nemo_curator-core-serve-placement-plan_replica_bundle_shape">
  <CodeBlock links={{"nemo_curator.core.serve.placement.ReplicaBundleSpec":"#nemo_curator-core-serve-placement-ReplicaBundleSpec"}} showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement.plan_replica_bundle_shape(
        tp_size: int,
        head_node_id: str | None = None,
        _topology: list[dict[str, typing.Any]] | None = None,
        _nodes: list[dict[str, typing.Any]] | None = None
    ) -> nemo_curator.core.serve.placement.ReplicaBundleSpec
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Pick the bundle shape for one replica given current cluster topology.

  Single-node: if any node has `&gt;= tp_size` GPUs, return one bundle of
  size `tp_size` with `STRICT_PACK`.

  Multi-node: find the smallest `nnodes` such that
  `tp_size % nnodes == 0` and at least `nnodes` nodes have
  `&gt;= tp_size / nnodes` GPUs each. Return `nnodes` equal bundles with
  `STRICT_SPREAD`. vLLM requires an even per-node split (1+3 for TP=4
  fails with a CUDA device ordinal error), so asymmetric splits are never
  considered.

  When `CURATOR_IGNORE_RAY_HEAD_NODE` is set, the head node is filtered
  out of topology and every bundle gets
  `[&#123;"ray.io/node-type": "worker"&#125;]` as a label selector.
</Indent>

<Anchor id="nemo_curator-core-serve-placement-remove_named_pgs_with_prefix">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.core.serve.placement.remove_named_pgs_with_prefix(
        prefix: str
    ) -> int
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Remove all placement groups in the current namespace whose name starts with *prefix*.

  Requires a live Ray connection on the current driver. Intended for orphan
  cleanup after a driver restart: since PGs are namespace-scoped and named,
  a reconnecting driver (with matching `namespace=`) can find and reap
  leftover state from a prior session. Removing a PG forcibly kills all
  actors scheduled into it, releasing the reserved resources.

  Returns the number of PGs removed.
</Indent>