nv_ingest_api.internal.primitives.nim package#

Subpackages#

Submodules#

nv_ingest_api.internal.primitives.nim.default_values module#

nv_ingest_api.internal.primitives.nim.nim_client module#

class nv_ingest_api.internal.primitives.nim.nim_client.InferenceRequest(data, future, model_name, dims, kwargs)#

Bases: tuple

data#

Alias for field number 0

dims#

Alias for field number 3

future#

Alias for field number 1

kwargs#

Alias for field number 4

model_name#

Alias for field number 2

class nv_ingest_api.internal.primitives.nim.nim_client.NimClient(
model_interface,
protocol: str,
endpoints: Tuple[str, str],
auth_token: str | None = None,
timeout: float = 120.0,
max_retries: int = 10,
max_429_retries: int = 5,
enable_dynamic_batching: bool = False,
dynamic_batch_timeout: float = 0.1,
dynamic_batch_memory_budget_mb: float | None = None,
)[source]#

Bases: object

A client for interfacing with a model inference server using gRPC or HTTP protocols.

close()[source]#

Stops the dynamic batching worker and closes client connections.

infer(data: dict, model_name: str, **kwargs) Any[source]#

Perform inference using the specified model and input data.

Parameters:
  • data (dict) – The input data for inference.

  • model_name (str) – The model name.

  • kwargs (dict) – Additional parameters for inference.

Returns:

The processed inference results, coalesced in the same order as the input images.

Return type:

Any

start()[source]#

Starts the dynamic batching worker thread if enabled.

submit(
data: Any,
model_name: str,
dims: Tuple[int, int],
**kwargs,
) Future[source]#

Submits a single inference request to the dynamic batcher.

This method is non-blocking and returns a Future object that will eventually contain the inference result.

Parameters:

data (Any) – The single data item for inference (e.g., one image, one text prompt).

Returns:

A future that will be fulfilled with the inference result.

Return type:

concurrent.futures.Future

try_set_max_batch_size(model_name, model_version: str = '')[source]#

Attempt to set the max batch size for the model if it is not already set, ensuring thread safety.

class nv_ingest_api.internal.primitives.nim.nim_client.NimClientManager[source]#

Bases: object

A thread-safe, singleton manager for creating and sharing NimClient instances.

This manager ensures that only one NimClient is created per unique configuration.

get_client(
model_interface,
**kwargs,
) NimClient[source]#

Gets or creates a NimClient for the given configuration.

shutdown()[source]#

Gracefully closes all managed NimClient instances. This is called automatically on application exit by atexit.

nv_ingest_api.internal.primitives.nim.nim_client.get_nim_client_manager(
*args,
**kwargs,
) NimClientManager[source]#

Returns the singleton instance of the NimClientManager.

nv_ingest_api.internal.primitives.nim.nim_client.reload_models(
client: InferenceServerClient,
exclude: list[str] = [],
client_timeout: int = 120,
) bool[source]#

Reloads all models in the Triton server except for the models in the exclude list.

Parameters:
  • client (grpcclient.InferenceServerClient) – The gRPC client connected to the Triton server.

  • exclude (list[str], optional) – A list of model names to exclude from reloading.

  • client_timeout (int, optional) – Timeout for client operations in seconds (default: 120).

Returns:

True if all models were successfully reloaded, False otherwise.

Return type:

bool

nv_ingest_api.internal.primitives.nim.nim_model_interface module#

class nv_ingest_api.internal.primitives.nim.nim_model_interface.ModelInterface[source]#

Bases: object

Base class for defining a model interface that supports preparing input data, formatting it for inference, parsing output, and processing inference results.

coalesce_requests_to_batch(
requests,
protocol: str,
**kwargs,
) Tuple[Any, Dict[str, Any]][source]#

Takes a list of InferenceRequest objects and combines them into a single formatted batch ready for inference.

THIS METHOD IS REQUIRED FOR DYNAMIC BATCHING SUPPORT.

Parameters:
  • requests (List[InferenceRequest]) – A list of InferenceRequest namedtuples collected for the batch. Each tuple contains the data, dimensions, and other context for a single item.

  • protocol (str) – The inference protocol, either “grpc” or “http”.

  • **kwargs (Any) – Additional keyword arguments passed from the original request.

Returns:

A tuple containing the single formatted batch and its scratch-pad data.

Return type:

Tuple[Any, Dict[str, Any]]

does_item_fit_in_batch(
current_batch,
next_request,
memory_budget_bytes: int,
) bool[source]#

Checks if adding another request to the current batch would exceed the memory budget.

This is a model-specific calculation. The default implementation always returns True, effectively ignoring the memory budget. Interfaces for models that require memory management (like padded image models) must override this.

Returns:

True if the item fits within the budget, False otherwise.

Return type:

bool

format_input(
data: dict,
protocol: str,
max_batch_size: int,
)[source]#

Format the input data for the specified protocol.

Parameters:
  • data (dict) – The input data to format.

  • protocol (str) – The protocol to format the data for.

name() str[source]#

Get the name of the model interface.

Returns:

The name of the model interface.

Return type:

str

parse_output(
response,
protocol: str,
data: dict | None = None,
**kwargs,
)[source]#

Parse the output data from the model’s inference response.

Parameters:
  • response (Any) – The response from the model inference.

  • protocol (str) – The protocol used (“grpc” or “http”).

  • data (dict, optional) – Additional input data passed to the function.

prepare_data_for_inference(data: dict)[source]#

Prepare input data for inference by processing or transforming it as required.

Parameters:

data (dict) – The input data to prepare.

process_inference_results(
output_array,
protocol: str,
**kwargs,
)[source]#

Process the inference results from the model.

Parameters:
  • output_array (Any) – The raw output from the model.

  • kwargs (dict) – Additional parameters for processing.

Module contents#

class nv_ingest_api.internal.primitives.nim.ModelInterface[source]#

Bases: object

Base class for defining a model interface that supports preparing input data, formatting it for inference, parsing output, and processing inference results.

coalesce_requests_to_batch(
requests,
protocol: str,
**kwargs,
) Tuple[Any, Dict[str, Any]][source]#

Takes a list of InferenceRequest objects and combines them into a single formatted batch ready for inference.

THIS METHOD IS REQUIRED FOR DYNAMIC BATCHING SUPPORT.

Parameters:
  • requests (List[InferenceRequest]) – A list of InferenceRequest namedtuples collected for the batch. Each tuple contains the data, dimensions, and other context for a single item.

  • protocol (str) – The inference protocol, either “grpc” or “http”.

  • **kwargs (Any) – Additional keyword arguments passed from the original request.

Returns:

A tuple containing the single formatted batch and its scratch-pad data.

Return type:

Tuple[Any, Dict[str, Any]]

does_item_fit_in_batch(
current_batch,
next_request,
memory_budget_bytes: int,
) bool[source]#

Checks if adding another request to the current batch would exceed the memory budget.

This is a model-specific calculation. The default implementation always returns True, effectively ignoring the memory budget. Interfaces for models that require memory management (like padded image models) must override this.

Returns:

True if the item fits within the budget, False otherwise.

Return type:

bool

format_input(
data: dict,
protocol: str,
max_batch_size: int,
)[source]#

Format the input data for the specified protocol.

Parameters:
  • data (dict) – The input data to format.

  • protocol (str) – The protocol to format the data for.

name() str[source]#

Get the name of the model interface.

Returns:

The name of the model interface.

Return type:

str

parse_output(
response,
protocol: str,
data: dict | None = None,
**kwargs,
)[source]#

Parse the output data from the model’s inference response.

Parameters:
  • response (Any) – The response from the model inference.

  • protocol (str) – The protocol used (“grpc” or “http”).

  • data (dict, optional) – Additional input data passed to the function.

prepare_data_for_inference(data: dict)[source]#

Prepare input data for inference by processing or transforming it as required.

Parameters:

data (dict) – The input data to prepare.

process_inference_results(
output_array,
protocol: str,
**kwargs,
)[source]#

Process the inference results from the model.

Parameters:
  • output_array (Any) – The raw output from the model.

  • kwargs (dict) – Additional parameters for processing.

class nv_ingest_api.internal.primitives.nim.NimClient(
model_interface,
protocol: str,
endpoints: Tuple[str, str],
auth_token: str | None = None,
timeout: float = 120.0,
max_retries: int = 10,
max_429_retries: int = 5,
enable_dynamic_batching: bool = False,
dynamic_batch_timeout: float = 0.1,
dynamic_batch_memory_budget_mb: float | None = None,
)[source]#

Bases: object

A client for interfacing with a model inference server using gRPC or HTTP protocols.

close()[source]#

Stops the dynamic batching worker and closes client connections.

infer(data: dict, model_name: str, **kwargs) Any[source]#

Perform inference using the specified model and input data.

Parameters:
  • data (dict) – The input data for inference.

  • model_name (str) – The model name.

  • kwargs (dict) – Additional parameters for inference.

Returns:

The processed inference results, coalesced in the same order as the input images.

Return type:

Any

start()[source]#

Starts the dynamic batching worker thread if enabled.

submit(
data: Any,
model_name: str,
dims: Tuple[int, int],
**kwargs,
) Future[source]#

Submits a single inference request to the dynamic batcher.

This method is non-blocking and returns a Future object that will eventually contain the inference result.

Parameters:

data (Any) – The single data item for inference (e.g., one image, one text prompt).

Returns:

A future that will be fulfilled with the inference result.

Return type:

concurrent.futures.Future

try_set_max_batch_size(model_name, model_version: str = '')[source]#

Attempt to set the max batch size for the model if it is not already set, ensuring thread safety.

nv_ingest_api.internal.primitives.nim.get_nim_client_manager(
*args,
**kwargs,
) NimClientManager[source]#

Returns the singleton instance of the NimClientManager.