Business Logic Scripting#
Triton’s ensemble feature supports many use cases where multiple models are composed into a pipeline (or more generally a DAG, directed acyclic graph). However, there are many other use cases that are not supported because as part of the model pipeline they require loops, conditionals (if-then-else), data-dependent control-flow and other custom logic to be intermixed with model execution. We call this combination of custom logic and model executions Business Logic Scripting (BLS).
Starting from 21.08, you can implement BLS in your Python model. A new set of
utility functions allows you to execute inference requests on other models
being served by Triton as a part of executing your Python model. Note that BLS
should only be used inside the execute
function and is not supported
in the initialize
or finalize
methods. Example below shows how to use this
feature:
import triton_python_backend_utils as pb_utils
class TritonPythonModel:
...
def execute(self, requests):
...
# Create an InferenceRequest object. `model_name`,
# `requested_output_names`, and `inputs` are the required arguments and
# must be provided when constructing an InferenceRequest object. Make
# sure to replace `inputs` argument with a list of `pb_utils.Tensor`
# objects.
inference_request = pb_utils.InferenceRequest(
model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<pb_utils.Tensor object>])
# `pb_utils.InferenceRequest` supports request_id, correlation_id,
# model version, timeout and preferred_memory in addition to the
# arguments described above.
# Note: Starting from the 24.03 release, the `correlation_id` parameter
# supports both string and unsigned integer values.
# These arguments are optional. An example containing all the arguments:
# inference_request = pb_utils.InferenceRequest(model_name='model_name',
# requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
# inputs=[<list of pb_utils.Tensor objects>],
# request_id="1", correlation_id=4, model_version=1, flags=0, timeout=5,
# preferred_memory=pb_utils.PreferredMemory(
# pb_utils.TRITONSERVER_MEMORY_GPU, # or pb_utils.TRITONSERVER_MEMORY_CPU
# 0))
# Execute the inference_request and wait for the response
inference_response = inference_request.exec()
# Check if the inference response has an error
if inference_response.has_error():
raise pb_utils.TritonModelException(
inference_response.error().message())
else:
# Extract the output tensors from the inference response.
output1 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_1')
output2 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_2')
# Decide the next steps for model execution based on the received
# output tensors. It is possible to use the same output tensors
# to for the final inference response too.
In addition to the inference_request.exec
function that allows you to
execute blocking inference requests, inference_request.async_exec
allows
you to perform async inference requests. This can be useful when you do not
need the result of the inference immediately. Using async_exec
function, it
is possible to have multiple inflight inference requests and wait for the
responses only when needed. Example below shows how to use async_exec
:
import triton_python_backend_utils as pb_utils
import asyncio
class TritonPythonModel:
...
# You must add the Python 'async' keyword to the beginning of `execute`
# function if you want to use `async_exec` function.
async def execute(self, requests):
...
# Create an InferenceRequest object. `model_name`,
# `requested_output_names`, and `inputs` are the required arguments and
# must be provided when constructing an InferenceRequest object. Make
# sure to replace `inputs` argument with a list of `pb_utils.Tensor`
# objects.
inference_request = pb_utils.InferenceRequest(
model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<pb_utils.Tensor object>])
infer_response_awaits = []
for i in range(4):
# async_exec function returns an
# [Awaitable](https://docs.python.org/3/library/asyncio-task.html#awaitables)
# object.
infer_response_awaits.append(inference_request.async_exec())
# Wait for all of the inference requests to complete.
infer_responses = await asyncio.gather(*infer_response_awaits)
for infer_response in infer_responses:
# Check if the inference response has an error
if inference_response.has_error():
raise pb_utils.TritonModelException(
inference_response.error().message())
else:
# Extract the output tensors from the inference response.
output1 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_1')
output2 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_2')
# Decide the next steps for model execution based on the received
# output tensors.
A complete example for sync and async BLS in Python backend is included in the Examples section.
Using BLS with Decoupled Models#
Starting from 23.03 release, you can execute inference requests on decoupled
models in both default mode and
decoupled mode. By setting the decoupled
parameter to
True
, the exec
and async_exec
function will return an
iterator of
inference responses returned by a decoupled model. If the decoupled
parameter
is set to False
, the exec
and async_exec
function will return a single
response as shown in the example above. Besides, you can set the timeout via
the parameter ‘timeout’ in microseconds within the constructor of
InferenceRequest
. If the request times out, the request will respond with an
error. The default of ‘timeout’ is 0 which indicates that the request has no
timeout.
Additionally, starting from the 23.04 release, you have the flexibility to
select a specific device to receive output tensors from BLS calls. This
can be achieved by setting the optional preferred_memory
parameter within the
InferenceRequest
constructor. To do this, you can create a PreferredMemory
object and specify the preferred_memory_type
as either
TRITONSERVER_MEMORY_GPU
or TRITONSERVER_MEMORY_CPU
, as well as the
preferred_device_id
as an integer to indicate the memory type and device ID
on which you wish to receive output tensors. If you do not specify the
preferred_memory
parameter, the output tensors will be allocated on the
same device where the output tensors were received from the model to which the
BLS call is made.
Example below shows how to use this feature:
import triton_python_backend_utils as pb_utils
class TritonPythonModel:
...
def execute(self, requests):
...
# Create an InferenceRequest object. `model_name`,
# `requested_output_names`, and `inputs` are the required arguments and
# must be provided when constructing an InferenceRequest object. Make
# sure to replace `inputs` argument with a list of `pb_utils.Tensor`
# objects.
inference_request = pb_utils.InferenceRequest(
model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<pb_utils.Tensor object>])
# `pb_utils.InferenceRequest` supports request_id, correlation_id,
# model version, timeout and preferred_memory in addition to the
# arguments described above.
# Note: Starting from the 24.03 release, the `correlation_id` parameter
# supports both string and unsigned integer values.
# These arguments are optional. An example containing all the arguments:
# inference_request = pb_utils.InferenceRequest(model_name='model_name',
# requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
# inputs=[<list of pb_utils.Tensor objects>],
# request_id="1", correlation_id="ex-4", model_version=1, flags=0, timeout=5,
# preferred_memory=pb_utils.PreferredMemory(
# pb_utils.TRITONSERVER_MEMORY_GPU, # or pb_utils.TRITONSERVER_MEMORY_CPU
# 0))
# Execute the inference_request and wait for the response. Here we are
# running a BLS request on a decoupled model, hence setting the parameter
# 'decoupled' to 'True'.
inference_responses = inference_request.exec(decoupled=True)
for inference_response in inference_responses:
# Check if the inference response has an error
if inference_response.has_error():
raise pb_utils.TritonModelException(
inference_response.error().message())
# For some models, it is possible that the last response is empty
if len(infer_response.output_tensors()) > 0:
# Extract the output tensors from the inference response.
output1 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_1')
output2 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_2')
# Decide the next steps for model execution based on the received
# output tensors. It is possible to use the same output tensors to
# for the final inference response too.
In addition to the inference_request.exec(decoupled=True)
function that
allows you to execute blocking inference requests on decoupled models,
inference_request.async_exec(decoupled=True)
allows you to perform async
inference requests. This can be useful when you do not need the result of the
inference immediately. Using async_exec
function, it is possible to have
multiple inflight inference requests and wait for the responses only when
needed. Example below shows how to use async_exec
:
import triton_python_backend_utils as pb_utils
import asyncio
class TritonPythonModel:
...
# You must add the Python 'async' keyword to the beginning of `execute`
# function if you want to use `async_exec` function.
async def execute(self, requests):
...
# Create an InferenceRequest object. `model_name`,
# `requested_output_names`, and `inputs` are the required arguments and
# must be provided when constructing an InferenceRequest object. Make
# sure to replace `inputs` argument with a list of `pb_utils.Tensor`
# objects.
inference_request = pb_utils.InferenceRequest(
model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<pb_utils.Tensor object>])
infer_response_awaits = []
for i in range(4):
# async_exec function returns an
# [Awaitable](https://docs.python.org/3/library/asyncio-task.html#awaitables)
# object.
infer_response_awaits.append(
inference_request.async_exec(decoupled=True))
# Wait for all of the inference requests to complete.
async_responses = await asyncio.gather(*infer_response_awaits)
for infer_responses in async_responses:
for infer_response in infer_responses:
# Check if the inference response has an error
if inference_response.has_error():
raise pb_utils.TritonModelException(
inference_response.error().message())
# For some models, it is possible that the last response is empty
if len(infer_response.output_tensors()) > 0:
# Extract the output tensors from the inference response.
output1 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_1')
output2 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_2')
# Decide the next steps for model execution based on the received
# output tensors.
A complete example for sync and async BLS for decoupled models is included in the Examples section.
Starting from the 22.04 release, the lifetime of the BLS output tensors have been improved such that if a tensor is no longer needed in your Python model it will be automatically deallocated. This can increase the number of BLS requests that you can execute in your model without running into the out of GPU or shared memory error.
Note: Async BLS is not supported on Python 3.6 or lower due to the async
keyword and asyncio.run
being introduced in Python 3.7.
Model Loading API#
Starting from 23.07 release, you can use the model loading API to load models required by your BLS model. The model loading API is equivalent to the Triton C API for loading models which are documented in tritonserver.h. Below is an example of how to use the model loading API:
import triton_python_backend_utils as pb_utils
class TritonPythonModel:
def initialize(self, args):
self.model_name="onnx_model"
# Check if the model is ready, and load the model if it is not ready.
# You can specify the model version in string format. The version is
# optional, and if not provided, the server will choose a version based
# on the model and internal policy.
if not pb_utils.is_model_ready(model_name=self.model_name,
model_version="1"):
# Load the model from the model repository
pb_utils.load_model(model_name=self.model_name)
# Load the model with an optional override model config in JSON
# representation. If provided, this config will be used for
# loading the model.
config = "{\"backend\":\"onnxruntime\", \"version_policy\":{\"specific\":{\"versions\":[1]}}}"
pb_utils.load_model(model_name=self.model_name, config=config)
# Load the mode with optional override files. The override files are
# specified as a dictionary where the key is the file path (with
# "file:" prefix) and the value is the file content as bytes. The
# files will form the model directory that the model will be loaded
# from. If specified, 'config' must be provided to be the model
# configuration of the override model directory.
with open('models/onnx_int32_int32_int32/1/model.onnx', 'rb') as file:
data = file.read()
files = {"file:1/model.onnx": data}
pb_utils.load_model(model_name=self.model_name,
config=config, files=files)
def execute(self, requests):
# Execute the model
...
# If the model is no longer needed, you can unload it. You can also
# specify whether the dependents of the model should also be unloaded by
# setting the 'unload_dependents' parameter to True. The default value
# is False. Need to be careful when unloading the model as it can affect
# other model instances or other models that depend on it.
pb_utils.unload_model(model_name=self.model_name,
unload_dependents=True)
Note that the model loading API is only supported if the server is running in
explicit model control mode.
Additionally, the model loading API should only be used after the server has
been running, which means that the BLS model should not be loaded during server
startup. You can use different
client endpoints
to load the model after the server has been started. The model loading API is
currently not supported during the auto_complete_config
and finalize
functions.
Using BLS with Stateful Models#
Stateful models
require setting additional flags in the inference request to indicate the
start and end of a sequence. The flags
argument in the pb_utils.InferenceRequest
object can be used to indicate whether the request is the first or last request
in the sequence. An example indicating that the request is starting the
sequence:
inference_request = pb_utils.InferenceRequest(model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<list of pb_utils.Tensor objects>],
request_id="1", correlation_id=4,
flags=pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_START)
For indicating the ending of the sequence you can use the
pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_END
flag. If the request is both
starting and ending a sequence at the same time (i.e. the sequence has only a
single request), you can use the bitwise OR operator to enable both of the
flags:
flags = pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_START | pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_END
Limitation#
You need to make sure that the inference requests performed as a part of your model do not create a circular dependency. For example, if model A performs an inference request on itself and there are no more model instances ready to execute the inference request, the model will block on the inference execution forever.
Async BLS is not supported when running a Python model in decoupled mode.