nemo_curator.core.serve

View as Markdown

Module Contents

Classes

NameDescription
InferenceModelConfigConfiguration for a single model to be served via Ray Serve.
InferenceServerServe one or more models via Ray Serve with an OpenAI-compatible endpoint.

Functions

NameDescription
is_ray_serve_activeCheck whether any InferenceServer is currently running in this process.

Data

_active_servers

API

class nemo_curator.core.serve.InferenceModelConfig(
model_identifier: str,
model_name: str | None = None,
deployment_config: dict[str, typing.Any] = dict(),
engine_kwargs: dict[str, typing.Any] = dict(),
runtime_env: dict[str, typing.Any] = dict()
)
Dataclass

Configuration for a single model to be served via Ray Serve.

Parameters:

model_identifier
str

HuggingFace model ID or local path (maps to model_source in LLMConfig).

model_name
str | NoneDefaults to None

API-facing model name clients use in requests. Defaults to model_identifier.

deployment_config
dict[str, Any]Defaults to dict()

Ray Serve deployment configuration (autoscaling, replicas, etc.). Passed directly to LLMConfig.deployment_config.

engine_kwargs
dict[str, Any]Defaults to dict()

vLLM engine keyword arguments (tensor_parallel_size, etc.). Passed directly to LLMConfig.engine_kwargs.

runtime_env
dict[str, Any]Defaults to dict()

Ray runtime environment configuration (pip packages, env_vars, working_dir, etc.). Merged with quiet logging overrides when verbose=False on the InferenceServer.

deployment_config
dict[str, Any] = field(default_factory=dict)
engine_kwargs
dict[str, Any] = field(default_factory=dict)
model_identifier
str
model_name
str | None = None
runtime_env
dict[str, Any] = field(default_factory=dict)
nemo_curator.core.serve.InferenceModelConfig._merge_runtime_envs(
base: dict[str, typing.Any],
override: dict[str, typing.Any] | None
) -> dict[str, typing.Any]
staticmethod

Merge two runtime_env dicts, with special handling for env_vars.

Top-level keys from override win, except env_vars which is merged key-by-key (override env vars take precedence over base).

nemo_curator.core.serve.InferenceModelConfig.to_llm_config(
quiet_runtime_env: dict[str, typing.Any] | None = None
) -> ray.serve.llm.LLMConfig

Convert to a Ray Serve LLMConfig.

Parameters:

quiet_runtime_env
dict[str, Any] | NoneDefaults to None

Optional runtime environment with quiet/logging overrides. Merged on top of self.runtime_env so that quiet env vars take precedence while preserving user-provided keys (e.g. pip, working_dir).

class nemo_curator.core.serve.InferenceServer(
models: list[nemo_curator.core.serve.InferenceModelConfig],
name: str = 'default',
port: int = DEFAULT_SERVE_PORT,
health_check_timeout_s: int = DEFAULT_SERVE_HEALTH_TIMEOUT_S,
verbose: bool = False
)
Dataclass

Serve one or more models via Ray Serve with an OpenAI-compatible endpoint.

Requires a running Ray cluster (e.g. via RayClient or RAY_ADDRESS env var).

Example::

from nemo_curator.core.serve import InferenceModelConfig, InferenceServer

config = InferenceModelConfig( model_identifier=“google/gemma-3-27b-it”, engine_kwargs={“tensor_parallel_size”: 4}, deployment_config={ “autoscaling_config”: { “min_replicas”: 1, “max_replicas”: 1, }, }, )

with InferenceServer(models=[config]) as server: print(server.endpoint) # http://localhost:8000/v1

Use with NeMo Curator’s OpenAIClient or AsyncOpenAIClient

Parameters:

models
list[InferenceModelConfig]

List of InferenceModelConfig instances to deploy.

name
strDefaults to 'default'

Ray Serve application name (default "default").

port
intDefaults to DEFAULT_SERVE_PORT

HTTP port for the OpenAI-compatible endpoint.

health_check_timeout_s
intDefaults to DEFAULT_SERVE_HEALTH_TIMEOUT_S

Seconds to wait for models to become healthy.

verbose
boolDefaults to False

If True, keep Ray Serve and vLLM logging at default levels. If False (default), suppress per-request logs from both vLLM (VLLM_LOGGING_LEVEL=WARNING) and Ray Serve access logs (RAY_SERVE_LOG_TO_STDERR=0). Serve logs still go to files under the Ray session log directory.

_started
bool = field(init=False, default=False, repr=False)
endpoint
str

OpenAI-compatible base URL for the served models.

When multiple models are deployed, clients select a model by passing model="<model_name>" in the request body (standard OpenAI API convention). The /v1/models endpoint lists all available models.

health_check_timeout_s
int = DEFAULT_SERVE_HEALTH_TIMEOUT_S
models
list[InferenceModelConfig]
name
str = 'default'
port
int = DEFAULT_SERVE_PORT
verbose
bool = False
nemo_curator.core.serve.InferenceServer.__enter__()
nemo_curator.core.serve.InferenceServer.__exit__(
exc = ()
)
nemo_curator.core.serve.InferenceServer.__post_init__() -> None
nemo_curator.core.serve.InferenceServer._cleanup_failed_deploy() -> None

Best-effort cleanup after a failed deploy (e.g. health check timeout).

Shuts down Ray Serve so that GPU memory and other resources held by partially-deployed replicas are released.

nemo_curator.core.serve.InferenceServer._deploy() -> None

Deploy models onto the connected Ray cluster (internal).

Must be called while a Ray connection is active.

nemo_curator.core.serve.InferenceServer._quiet_runtime_env() -> dict[str, typing.Any]
staticmethod

Return a runtime_env dict that suppresses per-request logs.

Works around two upstream bugs in Ray Serve (as of Ray 2.44+):

  1. vLLM request logs (Added request chatcmpl-...): _start_async_llm_engine creates AsyncLLM() without passing log_requests, so it defaults to True. Workaround: VLLM_LOGGING_LEVEL=WARNING. TODO: Once we upgrade past Ray 2.54 (see ray-project/ray#60824), pass "enable_log_requests": False in engine_kwargs instead and remove the VLLM_LOGGING_LEVEL env var workaround.

  2. Ray Serve access logs (POST /v1/... 200 Xms): configure_component_logger() only adds the access-log filter to the file handler, not the stderr stream handler, so LoggingConfig(enable_access_log=False) has no effect on console output. Workaround: RAY_SERVE_LOG_TO_STDERR=0 (logs still go to files under the Ray session log directory). TODO: Ray might fix this in the future.

nemo_curator.core.serve.InferenceServer._reset_serve_client_cache() -> None
staticmethod

Reset Ray Serve’s cached controller client.

Ray Serve caches the controller actor handle in a module-level _global_client. This handle becomes stale when the driver disconnects and reconnects (e.g. via with ray.init()). The built-in staleness check only catches RayActorError, not the “different cluster” exception that occurs across driver sessions.

Resetting forces the next Serve API call to look up the controller by its well-known actor name, producing a fresh handle.

TODO: Remove this method once https://github.com/ray-project/ray/issues/61608 is fixed.

nemo_curator.core.serve.InferenceServer._wait_for_healthy() -> None

Poll the /v1/models endpoint until all models are ready.

Uses wall-clock time to enforce the timeout accurately, regardless of how long individual HTTP requests take.

nemo_curator.core.serve.InferenceServer.start() -> None

Deploy all models and wait for them to become healthy.

The driver connects to the Ray cluster only for the duration of deployment. Once models are healthy the driver disconnects, so that the next ray.init() (e.g. from a pipeline executor) becomes the first driver-level init and its runtime_env takes effect on workers. Serve actors are detached and survive the disconnect.

Raises:

  • RuntimeError: If another InferenceServer is already active in this process. Only one InferenceServer can run at a time because Ray Serve uses a single HTTP proxy per cluster, and all models are deployed as a single application sharing the same /v1 routes. You can deploy multiple models in one InferenceServer (via the models list) — clients select a model by passing model="<model_name>" in the API request body. Stop the existing server before starting a new one.
nemo_curator.core.serve.InferenceServer.stop() -> None

Shut down Ray Serve (all applications, controller, and HTTP proxy).

Reconnects to the Ray cluster to tear down Serve actors and release GPU memory, then disconnects. If the cluster is already gone (e.g. RayClient was stopped first), the shutdown is skipped silently.

nemo_curator.core.serve.is_ray_serve_active() -> bool

Check whether any InferenceServer is currently running in this process.

nemo_curator.core.serve._active_servers: set[str] = set()