nemo_curator.core.serve.subprocess_mgr
nemo_curator.core.serve.subprocess_mgr
Ray-actor + subprocess lifecycle management.
ManagedSubprocess is the primary handle: it carries the detached
Ray actor, the long-running run() ObjectRef, and the optional log
path, and exposes the lifecycle operations (spawn, stop,
stop_many, is_alive, pid, read_log_tail, wait) so
callers never have to hand-write ray.get(actor.<something>.remote())
plumbing.
Every subprocess is launched with start_new_session=True, so it
becomes its own session and process-group leader with pgid == pid.
For the Dynamo -> vLLM path we care about, descendants are created via
Python multiprocessing without calling setsid(), so they stay in
that process group.
Teardown is driver-initiated (see graceful_stop_actors): SIGTERM on
the subprocess group -> bounded wait -> escalated SIGKILL on the same
group -> ray.kill for the actor itself. ray.kill on its own
SIGKILLs the actor process but leaves the launcher subprocess (which
was Popen’d by the actor) reparented to init and alive, so the
killpg-first ordering is required; it is not about the atexit hook.
Backends that use the with ray.init() attach/detach pattern around
start/stop must refresh stored ActorHandle references between the
two sessions — see reacquire_detached_actor_handles below. The
companion orphan sweeper sweep_orphan_actors_by_prefix matches
remove_named_pgs_with_prefix in placement.
Module Contents
Classes
Functions
API
Track a detached Ray actor and the subprocess it owns.
The instance methods hide the ray.get(self.actor.X.remote())
boilerplate callers would otherwise repeat for every lifecycle
operation. Teardown is parallelised via stop_many so a whole
replica (or the infra trio) can be reaped with one round-trip.
Create a detached Ray actor bound to pg’s bundle bundle_index and launch its subprocess.
Pass command for binary subprocesses (etcd, nats) or python_args
for Python module invocations (["-m", "dynamo.vllm", ...]).
When python_args is used, the actor prepends its own sys.executable
— which inside a runtime_env points to the isolated virtualenv’s
Python, not the driver’s. This ensures subprocesses load packages
from the runtime_env.
The actor class is created per-call with __name__ set to label,
so the Ray dashboard shows descriptive names.
The subprocess inherits the actor’s os.environ (raylet env +
runtime_env contributions). subprocess_env adds targeted
overrides on top (e.g. CUDA_VISIBLE_DEVICES).
Parameters:
Human-readable label (used for actor naming, class naming, logs).
The placement group that owns the bundle.
Which bundle in pg to pin this actor to.
GPUs to reserve for the actor (must match the bundle’s GPU count).
Full subprocess command for binary processes (mutually exclusive with python_args).
Arguments for a Python subprocess; actor prepends
sys.executable.
Directory for log files. None discards logs.
Prefix for the detached actor name (used for orphan cleanup / dashboard grouping).
Extra env vars for the subprocess (applied as overrides).
Ray runtime environment for the actor. Merged with
NOSET_CUDA_RUNTIME_ENV so the NOSET flag is always set.
Best-effort graceful stop: reap the subprocess group, then kill the actor.
Stop many managed subprocesses in parallel.
Kicks off every actor.stop.remote() first so subprocess
teardown happens concurrently, then waits on all of them with a
shared deadline before tearing down the actors.
Block until the subprocess exits. Returns its exit code.
Bases: RuntimeError
Raised when a managed subprocess fails during inference server lifecycle.
Carries an optional debug_context dict so callers can surface
subprocess logs, PIDs, or any other diagnostic state alongside the
user-facing message.
Return True iff the actor-side stop() completed successfully.
Raise if name is not found on $PATH.
Return a Ray remote actor class named actor_type.
Each call produces a class whose __name__ and __qualname__ are
set to actor_type so the Ray dashboard shows descriptive labels
(e.g. Dynamo_ETCD, Dynamo_DP0_Qwen3-0.6B).
Lifecycle:
initializePopens the subprocess withstart_new_session=Trueso the whole subtree shares one process group (pgid == popen.pid).runblocks until the subprocess exits; the returned ObjectRef resolves on exit soray.waitcan be used for liveness.stop/force_sigkill_subprocessreap that process group.
Teardown is driver-initiated via graceful_stop_actors so the
actor can killpg its subprocess group before ray.kill hard-kills
the actor process itself — otherwise the Popen-launched child
reparents to init and keeps running. A Python atexit hook is
registered as a backup for clean actor exits (e.g. a normal raylet
shutdown); it does not fire on ray.kill / PG removal / SIGKILL.
Assemble the Ray detached-actor name for a given prefix and label.
Both ManagedSubprocess.spawn (when naming a new actor) and
reacquire_detached_actor_handles (when re-looking it up in a later
session) must agree on this formula, so it lives in one place.
Best-effort dispatch of actor.stop.remote() for one actor.
SIGTERM the whole process group, wait briefly, then SIGKILL.
Because every managed subprocess is launched with
start_new_session=True, pgid == pid for the launcher and every
descendant that did not explicitly call setsid() remains in that
group. The group persists while any member is alive, so this still
works even if the launcher itself already exited.
Reap the subprocess’s process group and return its exit code.
Assumes proc was launched with start_new_session=True, so
proc.pid is the process-group id to target.
Block until a TCP connection to host:port succeeds.
Stop many detached actors in parallel.
Prefer ManagedSubprocess.stop / ManagedSubprocess.stop_many.
This primitive is for the raw-actor case (e.g. the reconnecting
driver looking up orphaned actors by name).
Escalation order per actor:
actor.stop.remote()— actor-driven SIGTERM on the subprocess group with a bounded wait, then SIGKILL if needed.- If stop did not drain in time,
actor.force_sigkill_subprocess.remote()— host-side SIGKILL on the subprocess group. This runs on the actor’s node (because actor methods always do), so it works multi-node without any driver-side connectivity assumption. ray.kill(actor)— tear down the actor itself.
Step 2 matters because ray.kill SIGKILLs the actor process; its
Popen child (the launcher) reparents to init and keeps running, and
that launcher’s own children (e.g. vLLM EngineCore) stay alive too.
Sending killpg first ensures the whole subprocess tree is gone
before the actor itself is torn down. Each dispatch is guarded
individually so a single invalid handle (e.g. a stale
cross-ray.init()-session handle) does not abort the batch.
Refresh each proc.actor in place by re-looking up its detached actor.
ActorHandle objects are pinned to the Ray job that created them, so
any handle stored across a with ray.init() boundary raises
ActorHandleNotFoundError on .remote(). Backends that use the
start/stop attach-detach pattern must re-acquire their handles inside
the stop-side with ray.init() before calling any actor method.
Actor names follow the ManagedSubprocess.spawn convention:
f"{actor_name_prefix}_{label}", or just label when the prefix
is empty.
Returns a filtered list containing only procs whose actor was
successfully re-acquired; entries whose actor is already gone are
dropped so callers can safely pass the result to
ManagedSubprocess.stop_many.
Graceful-stop any ALIVE detached actor whose name starts with prefix.
Intended for orphan cleanup after a driver restart: detached actors
outlive the driver that created them, so a reconnecting driver (in the
same namespace) can list them by name, re-acquire handles, and reap
them + their subprocess trees via graceful_stop_actors. Returns the
number of orphans reaped. Mirrors remove_named_pgs_with_prefix in
placement.