nemo_curator.core.serve.dynamo.backend

View as Markdown

NVIDIA Dynamo inference backend.

Aggregated: one detached PG per replica carries its TP bundles. Disaggregated: one detached PG per prefill / decode worker, each single-bundle. A separate STRICT_PACK PG co-locates etcd, NATS, and the Dynamo frontend.

Module Contents

Classes

NameDescription
DynamoBackendDynamo backend for InferenceServer — aggregated serving on Ray PGs.

API

class nemo_curator.core.serve.dynamo.backend.DynamoBackend(
server: nemo_curator.core.serve.server.InferenceServer
)

Bases: InferenceBackend

Dynamo backend for InferenceServer — aggregated serving on Ray PGs.

  • start() enters the nemo_curator_dynamo namespace, sweeps any leftover actors + PGs from a prior driver session, then deploys infra → workers → frontend and blocks on a /v1/models health check.
  • stop() re-enters the same namespace in a fresh Ray session; because ActorHandle objects do not survive a ray.shutdown() boundary, the stored handles are refreshed by name before the parallel SIGTERM → SIGKILL teardown runs. Replica + infra PGs are then removed.
_actor_name_prefix
str = ''
_backend_cfg
DynamoServerConfig = server.backend
_etcd_actor
ManagedSubprocess | None = None
_frontend_actor
ManagedSubprocess | None = None
_infra_ip
str | None = None
_infra_pg
PlacementGroup | None = None
_models
list[DynamoVLLMModelConfig] = cast('list[DynamoVLLMModelConfig]', server.models)
_nats_actor
ManagedSubprocess | None = None
_pg_name_prefix
str = ''
_replica_pgs
list[PlacementGroup] = []
_runtime_dir
str | None = None
_worker_actors
list[ManagedSubprocess] = []
nemo_curator.core.serve.dynamo.backend.DynamoBackend._all_actor_procs() -> list[nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess]
nemo_curator.core.serve.dynamo.backend.DynamoBackend._check_subprocess_health(
monitored: list[nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess]
) -> None

Detect subprocess exits via ray.wait() on the cached run refs.

nemo_curator.core.serve.dynamo.backend.DynamoBackend._deploy_and_healthcheck(
server: nemo_curator.core.serve.server.InferenceServer,
backend_cfg: nemo_curator.core.serve.dynamo.config.DynamoServerConfig
) -> None

Validate, create PGs, launch infra/workers/frontend, health-check.

nemo_curator.core.serve.dynamo.backend.DynamoBackend._launch_frontend(
port: int,
base_env: dict[str, str],
backend_cfg: nemo_curator.core.serve.dynamo.config.DynamoServerConfig,
effective_router_mode: str | None = None,
effective_router_kv_events: bool | None = None,
runtime_env: dict[str, typing.Any] | None = None
) -> nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess

Launch the Dynamo frontend bound to the infra node.

Emits --router-mode and --[no-]router-kv-events from the resolved values; anything else in router_kwargs (temperature, ttl_secs …) is forwarded verbatim via snake-to-kebab CLI flag translation.

effective_router_mode / effective_router_kv_events let _deploy_and_healthcheck pass in auto-resolved values (e.g. "kv" + True when any model is disagg). When either is None the corresponding typed router field is used verbatim.

nemo_curator.core.serve.dynamo.backend.DynamoBackend._raise_subprocess_error(
label: str,
log_tail: str,
reason: str
) -> None
staticmethod
nemo_curator.core.serve.dynamo.backend.DynamoBackend._resolve_effective_router(
models: list[nemo_curator.core.serve.dynamo.config.DynamoVLLMModelConfig],
router: nemo_curator.core.serve.dynamo.config.DynamoRouterConfig
) -> tuple[str | None, bool]
staticmethod

Resolve (router_mode, router_kv_events) for the frontend.

  • mode: honor router.mode if set; otherwise auto-pick "kv" when any model uses mode="disagg", else leave unset so the Dynamo frontend falls back to its own round_robin default.
  • kv_events: when we auto-pick mode="kv" we also auto-enable kv_events so the router consumes what prefill workers publish unconditionally in disagg. If the user set router.mode explicitly (to any value) we honor their router.kv_events as-is.
nemo_curator.core.serve.dynamo.backend.DynamoBackend._start_etcd(
port: int
) -> nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess
nemo_curator.core.serve.dynamo.backend.DynamoBackend._start_infra_service(
label: str,
bundle_index: int,
port: int,
command: list[str],
subprocess_env: dict[str, str] | None = None
) -> nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess
nemo_curator.core.serve.dynamo.backend.DynamoBackend._start_nats(
port: int
) -> nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess
nemo_curator.core.serve.dynamo.backend.DynamoBackend._sweep_orphan_actors() -> None

Reap any detached actors left behind by a prior driver session.

remove_named_pgs_with_prefix force-kills actors scheduled into the reaped PGs, which would orphan the subprocess tree; sweeping named actors first lets graceful_stop_actors killpg each process group cleanly.

nemo_curator.core.serve.dynamo.backend.DynamoBackend._teardown_actors_and_pgs() -> None

Parallel-stop every actor, then release the placement groups.

ActorHandle objects stored on self during start() belong to that session’s Ray job and are invalid here (stop() opened its own with ray.init()), so the handles are refreshed by detached-actor name before any .remote() call is issued.

nemo_curator.core.serve.dynamo.backend.DynamoBackend._validate_gpu_requirements(
models: list[nemo_curator.core.serve.dynamo.config.DynamoVLLMModelConfig],
topology: list[dict[str, typing.Any]] | None = None
) -> None
staticmethod

Coarse fail-fast on cluster-wide GPU over-commit and disagg TP fit.

Ray’s per-PG STRICT_PACK / STRICT_SPREAD is the authoritative admission gate; this produces a better error than the admission timeout. For disagg models we also reject configurations where a single role’s TP group would not fit on one node — disagg does not support multi-node TP.

nemo_curator.core.serve.dynamo.backend.DynamoBackend._validate_unique_model_names(
models: list[nemo_curator.core.serve.dynamo.config.DynamoVLLMModelConfig]
) -> None
staticmethod

Reject duplicate model names and component-slug collisions.

Dynamo registers each worker under a dyn://namespace.component.endpoint URI; duplicate model names (or names that sanitize to the same slug) would silently overwrite each other inside etcd.

nemo_curator.core.serve.dynamo.backend.DynamoBackend._wait_for_models(
server: nemo_curator.core.serve.server.InferenceServer,
expected_models: set[str]
) -> None

Poll /v1/models until all expected_models appear.

nemo_curator.core.serve.dynamo.backend.DynamoBackend._write_manifest(
data: dict[str, typing.Any],
ready: bool
) -> None
nemo_curator.core.serve.dynamo.backend.DynamoBackend.start() -> None
nemo_curator.core.serve.dynamo.backend.DynamoBackend.stop() -> None