Use the API (gRPC) for NVIDIA NeMo Retriever Reranking NIM#
Use the examples in this documentation to help you get started using the API for NVIDIA NeMo Retriever Reranking NIM.
For the full API reference, refer to API Reference (gRPC).
gRPC Support#
The NeMo Retriever Reranking NIM supports the Open Inference Protocol (KServe V2). You can make gRPC inference requests by using the KServe V2 protocol buffers with a gRPC client.
Launch the NeMo Retriever Reranking NIM#
Launch NeMo Retriever Reranking NIM by following the Get Started guide.
In the code to launch the NIM,
include the additional argument -p 8001:8001 as shown following.
# Start the NIM
docker run -it --rm --name=$CONTAINER_NAME \
--runtime=nvidia \
--gpus all \
--shm-size=16GB \
-e NGC_API_KEY \
-v "$LOCAL_NIM_CACHE/cache:/opt/cache" \
-v "$LOCAL_NIM_CACHE/weights:/model" \
-u $(id -u) \
-p 8000:8000 \
-p 8001:8001 \ # additional argument
$IMG_NAME
Make Inference Calls#
After you launch the NeMo Retriever Reranking NIM, you can make inference calls by using the following code.
First, install Python dependencies.
python3 -m pip install grpcio grpcio-tools numpy
curl -LO https://raw.githubusercontent.com/kserve/open-inference-protocol/main/specification/protocol/open_inference_grpc.proto
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. open_inference_grpc.proto
Next, make inference calls.
from dataclasses import dataclass
from enum import StrEnum, auto
import grpc
import numpy as np
from numpy.typing import NDArray
import open_inference_grpc_pb2 as oip
import open_inference_grpc_pb2_grpc as oip_grpc
class InputName(StrEnum):
QUERY = auto()
PASSAGE = auto()
class TruncateValue(StrEnum):
END = auto()
NONE = auto()
class OutputName(StrEnum):
INDEX = auto()
LOGIT = auto()
TOKEN_COUNT = auto()
@dataclass
class RankingResponse:
index: NDArray[np.int32]
logit: NDArray[np.float32]
token_count: NDArray[np.int32]
def _string_param(value: StrEnum | str) -> oip.InferParameter:
return oip.InferParameter(string_param=value.value if isinstance(value, StrEnum) else value)
def _bytes_tensor(name: str, values: list[str]) -> oip.ModelInferRequest.InferInputTensor:
encoded_values = [value.encode("utf-8") for value in values]
return oip.ModelInferRequest.InferInputTensor(
name=name,
datatype="BYTES",
shape=[len(encoded_values), 1],
contents=oip.InferTensorContents(bytes_contents=encoded_values),
)
def _output_by_name(
response: oip.ModelInferResponse, name: OutputName
) -> tuple[int, oip.ModelInferResponse.InferOutputTensor]:
for index, output in enumerate(response.outputs):
if output.name == name.value:
return index, output
raise ValueError(f"'{name.value}' not found in the response")
def _int32_output(response: oip.ModelInferResponse, name: OutputName) -> NDArray[np.int32]:
index, output = _output_by_name(response, name)
if index < len(response.raw_output_contents):
return np.frombuffer(response.raw_output_contents[index], dtype=np.int32).reshape(
output.shape
)
return np.array(output.contents.int_contents, dtype=np.int32).reshape(output.shape)
def _fp32_output(response: oip.ModelInferResponse, name: OutputName) -> NDArray[np.float32]:
index, output = _output_by_name(response, name)
if index < len(response.raw_output_contents):
return np.frombuffer(response.raw_output_contents[index], dtype=np.float32).reshape(
output.shape
)
return np.array(output.contents.fp32_contents, dtype=np.float32).reshape(output.shape)
class RankingClient:
def __init__(self, endpoint: str = "localhost:8001") -> None:
self._endpoint = endpoint
async def compute(
self,
model_name: str,
query: str,
passages: list[str],
truncate: TruncateValue = TruncateValue.NONE,
) -> RankingResponse:
request = oip.ModelInferRequest(
model_name=model_name,
inputs=[
_bytes_tensor(InputName.QUERY.value, [query] * len(passages)),
_bytes_tensor(InputName.PASSAGE.value, passages),
],
parameters={"truncate": _string_param(truncate)},
)
async with grpc.aio.insecure_channel(self._endpoint) as channel:
client = oip_grpc.GRPCInferenceServiceStub(channel)
response = await client.ModelInfer(request)
return RankingResponse(
index=_int32_output(response, OutputName.INDEX),
logit=_fp32_output(response, OutputName.LOGIT),
token_count=_int32_output(response, OutputName.TOKEN_COUNT),
)
client = RankingClient()
await client.compute(
model_name="nvidia_llama_nemotron_rerank_1b_v2",
query="hello",
passages=["world"],
truncate="none",
)
The result should look similar to the following.
RankingResponse(index=array([[0]], dtype=int32), logit=array([[-4.3242188]], dtype=float32), token_count=array([8], dtype=int32))