nemo_curator.core.serve.subprocess_mgr

View as Markdown

Ray-actor + subprocess lifecycle management.

ManagedSubprocess is the primary handle: it carries the detached Ray actor, the long-running run() ObjectRef, and the optional log path, and exposes the lifecycle operations (spawn, stop, stop_many, is_alive, pid, read_log_tail, wait) so callers never have to hand-write ray.get(actor.<something>.remote()) plumbing.

Every subprocess is launched with start_new_session=True, so it becomes its own session and process-group leader with pgid == pid. For the Dynamo -> vLLM path we care about, descendants are created via Python multiprocessing without calling setsid(), so they stay in that process group.

Teardown is driver-initiated (see graceful_stop_actors): SIGTERM on the subprocess group -> bounded wait -> escalated SIGKILL on the same group -> ray.kill for the actor itself. ray.kill on its own SIGKILLs the actor process but leaves the launcher subprocess (which was Popen’d by the actor) reparented to init and alive, so the killpg-first ordering is required; it is not about the atexit hook.

Backends that use the with ray.init() attach/detach pattern around start/stop must refresh stored ActorHandle references between the two sessions — see reacquire_detached_actor_handles below. The companion orphan sweeper sweep_orphan_actors_by_prefix matches remove_named_pgs_with_prefix in placement.

Module Contents

Classes

NameDescription
ManagedSubprocessTrack a detached Ray actor and the subprocess it owns.
SubprocessErrorRaised when a managed subprocess fails during inference server lifecycle.

Functions

NameDescription
_actor_stop_drainedReturn True iff the actor-side stop() completed successfully.
_check_binaryRaise if name is not found on $PATH.
_define_subprocess_actorReturn a Ray remote actor class named actor_type.
_detached_actor_nameAssemble the Ray detached-actor name for a given prefix and label.
_dispatch_actor_stopBest-effort dispatch of actor.stop.remote() for one actor.
_reap_process_groupSIGTERM the whole process group, wait briefly, then SIGKILL.
_stop_subprocessReap the subprocess’s process group and return its exit code.
_wait_for_portBlock until a TCP connection to host:port succeeds.
graceful_stop_actorsStop many detached actors in parallel.
reacquire_detached_actor_handlesRefresh each proc.actor in place by re-looking up its detached actor.
sweep_orphan_actors_by_prefixGraceful-stop any ALIVE detached actor whose name starts with prefix.

API

class nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess(
label: str,
actor: ray.actor.ActorHandle,
run_ref: ray.ObjectRef | None = None
)
Dataclass

Track a detached Ray actor and the subprocess it owns.

The instance methods hide the ray.get(self.actor.X.remote()) boilerplate callers would otherwise repeat for every lifecycle operation. Teardown is parallelised via stop_many so a whole replica (or the infra trio) can be reaped with one round-trip.

actor
ActorHandle
label
str
run_ref
ObjectRef | None = None
nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess.is_alive() -> bool
nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess.pid() -> int
nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess.read_log_tail(
num_bytes: int = 8192
) -> str
nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess.spawn(
label: str,
pg: ray.util.placement_group.PlacementGroup,
bundle_index: int,
num_gpus: int,
command: list[str] | None = None,
python_args: list[str] | None = None,
runtime_dir: str | None = None,
actor_name_prefix: str = '',
subprocess_env: dict[str, str] | None = None,
runtime_env: dict[str, typing.Any] | None = None
) -> nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess
classmethod

Create a detached Ray actor bound to pg’s bundle bundle_index and launch its subprocess.

Pass command for binary subprocesses (etcd, nats) or python_args for Python module invocations (["-m", "dynamo.vllm", ...]). When python_args is used, the actor prepends its own sys.executable — which inside a runtime_env points to the isolated virtualenv’s Python, not the driver’s. This ensures subprocesses load packages from the runtime_env.

The actor class is created per-call with __name__ set to label, so the Ray dashboard shows descriptive names.

The subprocess inherits the actor’s os.environ (raylet env + runtime_env contributions). subprocess_env adds targeted overrides on top (e.g. CUDA_VISIBLE_DEVICES).

Parameters:

label
str

Human-readable label (used for actor naming, class naming, logs).

pg
PlacementGroup

The placement group that owns the bundle.

bundle_index
int

Which bundle in pg to pin this actor to.

num_gpus
int

GPUs to reserve for the actor (must match the bundle’s GPU count).

command
list[str] | NoneDefaults to None

Full subprocess command for binary processes (mutually exclusive with python_args).

python_args
list[str] | NoneDefaults to None

Arguments for a Python subprocess; actor prepends sys.executable.

runtime_dir
str | NoneDefaults to None

Directory for log files. None discards logs.

actor_name_prefix
strDefaults to ''

Prefix for the detached actor name (used for orphan cleanup / dashboard grouping).

subprocess_env
dict[str, str] | NoneDefaults to None

Extra env vars for the subprocess (applied as overrides).

runtime_env
dict[str, Any] | NoneDefaults to None

Ray runtime environment for the actor. Merged with NOSET_CUDA_RUNTIME_ENV so the NOSET flag is always set.

nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess.stop(
timeout_s: float | None = None
) -> None

Best-effort graceful stop: reap the subprocess group, then kill the actor.

classmethod

Stop many managed subprocesses in parallel.

Kicks off every actor.stop.remote() first so subprocess teardown happens concurrently, then waits on all of them with a shared deadline before tearing down the actors.

nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess.wait(
timeout: float | None = None
) -> int

Block until the subprocess exits. Returns its exit code.

class nemo_curator.core.serve.subprocess_mgr.SubprocessError(
message: str,
debug_context: dict | None = None
)

Bases: RuntimeError

Raised when a managed subprocess fails during inference server lifecycle.

Carries an optional debug_context dict so callers can surface subprocess logs, PIDs, or any other diagnostic state alongside the user-facing message.

debug_context
= debug_context or {}
nemo_curator.core.serve.subprocess_mgr._actor_stop_drained(
ref: ray.ObjectRef | None
) -> bool

Return True iff the actor-side stop() completed successfully.

nemo_curator.core.serve.subprocess_mgr._check_binary(
name: str
) -> None

Raise if name is not found on $PATH.

nemo_curator.core.serve.subprocess_mgr._define_subprocess_actor(
actor_type: str = 'SubprocessActor'
) -> type

Return a Ray remote actor class named actor_type.

Each call produces a class whose __name__ and __qualname__ are set to actor_type so the Ray dashboard shows descriptive labels (e.g. Dynamo_ETCD, Dynamo_DP0_Qwen3-0.6B).

Lifecycle:

  1. initialize Popens the subprocess with start_new_session=True so the whole subtree shares one process group (pgid == popen.pid).
  2. run blocks until the subprocess exits; the returned ObjectRef resolves on exit so ray.wait can be used for liveness.
  3. stop / force_sigkill_subprocess reap that process group.

Teardown is driver-initiated via graceful_stop_actors so the actor can killpg its subprocess group before ray.kill hard-kills the actor process itself — otherwise the Popen-launched child reparents to init and keeps running. A Python atexit hook is registered as a backup for clean actor exits (e.g. a normal raylet shutdown); it does not fire on ray.kill / PG removal / SIGKILL.

nemo_curator.core.serve.subprocess_mgr._detached_actor_name(
prefix: str,
label: str
) -> str

Assemble the Ray detached-actor name for a given prefix and label.

Both ManagedSubprocess.spawn (when naming a new actor) and reacquire_detached_actor_handles (when re-looking it up in a later session) must agree on this formula, so it lives in one place.

nemo_curator.core.serve.subprocess_mgr._dispatch_actor_stop(
label: str,
actor: ray.actor.ActorHandle
) -> ray.ObjectRef | None

Best-effort dispatch of actor.stop.remote() for one actor.

nemo_curator.core.serve.subprocess_mgr._reap_process_group(
pgid: int,
sigterm_wait: float = SIGTERM_WAIT_S
) -> None

SIGTERM the whole process group, wait briefly, then SIGKILL.

Because every managed subprocess is launched with start_new_session=True, pgid == pid for the launcher and every descendant that did not explicitly call setsid() remains in that group. The group persists while any member is alive, so this still works even if the launcher itself already exited.

nemo_curator.core.serve.subprocess_mgr._stop_subprocess(
proc: subprocess.Popen,
sigterm_wait: float = SIGTERM_WAIT_S
) -> int | None

Reap the subprocess’s process group and return its exit code.

Assumes proc was launched with start_new_session=True, so proc.pid is the process-group id to target.

nemo_curator.core.serve.subprocess_mgr._wait_for_port(
host: str,
port: int,
timeout_s: float = 30,
label: str = ''
) -> None

Block until a TCP connection to host:port succeeds.

nemo_curator.core.serve.subprocess_mgr.graceful_stop_actors(
labeled_actors: list[tuple[str, ray.actor.ActorHandle]],
timeout_s: float | None = None
) -> None

Stop many detached actors in parallel.

Prefer ManagedSubprocess.stop / ManagedSubprocess.stop_many. This primitive is for the raw-actor case (e.g. the reconnecting driver looking up orphaned actors by name).

Escalation order per actor:

  1. actor.stop.remote() — actor-driven SIGTERM on the subprocess group with a bounded wait, then SIGKILL if needed.
  2. If stop did not drain in time, actor.force_sigkill_subprocess.remote() — host-side SIGKILL on the subprocess group. This runs on the actor’s node (because actor methods always do), so it works multi-node without any driver-side connectivity assumption.
  3. ray.kill(actor) — tear down the actor itself.

Step 2 matters because ray.kill SIGKILLs the actor process; its Popen child (the launcher) reparents to init and keeps running, and that launcher’s own children (e.g. vLLM EngineCore) stay alive too. Sending killpg first ensures the whole subprocess tree is gone before the actor itself is torn down. Each dispatch is guarded individually so a single invalid handle (e.g. a stale cross-ray.init()-session handle) does not abort the batch.

nemo_curator.core.serve.subprocess_mgr.reacquire_detached_actor_handles(
procs: list[nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess],
actor_name_prefix: str,
namespace: str
) -> list[nemo_curator.core.serve.subprocess_mgr.ManagedSubprocess]

Refresh each proc.actor in place by re-looking up its detached actor.

ActorHandle objects are pinned to the Ray job that created them, so any handle stored across a with ray.init() boundary raises ActorHandleNotFoundError on .remote(). Backends that use the start/stop attach-detach pattern must re-acquire their handles inside the stop-side with ray.init() before calling any actor method.

Actor names follow the ManagedSubprocess.spawn convention: f"{actor_name_prefix}_{label}", or just label when the prefix is empty.

Returns a filtered list containing only procs whose actor was successfully re-acquired; entries whose actor is already gone are dropped so callers can safely pass the result to ManagedSubprocess.stop_many.

nemo_curator.core.serve.subprocess_mgr.sweep_orphan_actors_by_prefix(
prefix: str,
namespace: str
) -> int

Graceful-stop any ALIVE detached actor whose name starts with prefix.

Intended for orphan cleanup after a driver restart: detached actors outlive the driver that created them, so a reconnecting driver (in the same namespace) can list them by name, re-acquire handles, and reap them + their subprocess trees via graceful_stop_actors. Returns the number of orphans reaped. Mirrors remove_named_pgs_with_prefix in placement.