Source code for polygraphy.backend.trt.runner

#
# SPDX-FileCopyrightText: Copyright (c) 1993-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
from collections import OrderedDict

from polygraphy import cuda, mod, util
from polygraphy.backend.base import BaseRunner
from polygraphy.backend.trt import util as trt_util
from polygraphy.common import FormattedArray
from polygraphy.datatype import DataType
from polygraphy.logger import G_LOGGER

np = mod.lazy_import("numpy")
torch = mod.lazy_import("torch>=1.13.0")
trt = mod.lazy_import("tensorrt>=8.5")


def _make_debug_listener():
    class DebugTensorWriter(trt.IDebugListener):
        def __init__(self):
            trt.IDebugListener.__init__(self)
            self.debug_tensor_outputs = {}

        def process_debug_tensor(self, addr, location, type, shape, name, stream):
            cuda.wrapper().stream_synchronize(stream)
            datatype = DataType.from_dtype(type)
            size = util.volume(shape)
            buffer = np.zeros(shape, dtype=DataType.to_dtype(datatype, "numpy"))
            buffer = util.array.resize_or_reallocate(buffer, size)
            cuda.wrapper().memcpy(
                dst=util.array.data_ptr(buffer),
                src=addr,
                nbytes=size * datatype.itemsize,
                kind=cuda.MemcpyKind.DeviceToHost,
                stream_ptr=stream,
            )
            cuda.wrapper().stream_synchronize(stream)
            self.debug_tensor_outputs[name] = util.array.resize_or_reallocate(buffer, shape)

    return DebugTensorWriter()


def _make_output_allocator():

    class OutputAllocator(trt.IOutputAllocator):
        def __init__(self):
            trt.IOutputAllocator.__init__(self)
            self.buffers = {}
            self.shapes = {}
            self.use_torch = False

        def reallocate_output(self, tensor_name, memory, size, alignment):
            shape = (size,)
            if tensor_name not in self.buffers:
                self.buffers[tensor_name] = (
                    cuda.DeviceArray.raw(shape)
                    if not self.use_torch
                    else torch.empty(shape, dtype=torch.uint8, device="cuda")
                )
            else:
                self.buffers[tensor_name] = util.array.resize_or_reallocate(self.buffers[tensor_name], shape)
            G_LOGGER.extra_verbose(f"Reallocated output tensor: {tensor_name} to: {self.buffers[tensor_name]}")
            return util.array.data_ptr(self.buffers[tensor_name])

        def notify_shape(self, tensor_name, shape):
            self.shapes[tensor_name] = tuple(shape)

        def set_use_torch(self, use_torch):
            self.use_torch = use_torch

    return OutputAllocator()


def _get_array_on_cpu(arr, name, host_buffers, stream, nbytes, use_torch):
    """
    Copies the provided array to CPU memory and returns it.
    If sufficient CPU memory has not been allocated for the array in
    ``host_bufffers``, this function will allocate new memory.

    If the input is a `torch.Tensor`, then a `torch.Tensor` is returned.
    Otherwise, if the input is a `DeviceView`, a `NumPy` array is returned.

    Args:
        arr (Union[DeviceView, torch.Tensor]): The array.
        name (str): The name of the array.
        host_buffers (Dict[str, Union[numpy.ndarray, torch.Tensor]]):
                A mapping of names to host buffers.
        stream (cuda.Stream): The CUDA stream to use.
        nbytes (int): The number of bytes to copy. This may be smaller than the size of the GPU memory.
        use_torch (bool): Whether to use PyTorch tensors instead of NumPy arrays.

    Returns:
        Union[numpy.ndarray, torch.Tensor]: The host buffer as a flat array of bytes.
    """
    if not util.array.is_on_gpu(arr):
        G_LOGGER.internal_error(f"_get_array_on_cpu() should only be called with input arrays on the GPU!")

    # The host buffer will always be a "raw" array, i.e. a flat array of bytes.
    shape = (nbytes,)
    dtype = DataType.UINT8
    # If we switch between torch tensors and DeviceViews between inferences, we need to reallocate the host buffer.
    if name not in host_buffers or util.array.is_torch(host_buffers[name]) != use_torch:
        host_buffers[name] = (
            np.empty(shape, dtype=DataType.to_dtype(dtype, "numpy"))
            if not use_torch
            else torch.empty(shape, dtype=DataType.to_dtype(dtype, "torch"), device="cpu")
        )

    host_buffers[name] = util.array.resize_or_reallocate(host_buffers[name], shape)
    cuda.wrapper().memcpy(
        dst=util.array.data_ptr(host_buffers[name]),
        src=util.array.data_ptr(arr),
        nbytes=nbytes,
        kind=cuda.MemcpyKind.DeviceToHost,
        stream_ptr=stream.ptr,
    )
    return host_buffers[name]


[docs] @mod.export() class TrtRunner(BaseRunner): """ Runs inference using TensorRT. Note that runners are not designed for production deployment and should generally be used only for prototyping, testing, and debugging. """ def __init__( self, engine, name: str = None, optimization_profile: int = None, allocation_strategy: str = None, weight_streaming_budget: int = None, weight_streaming_percent: float = None, ): """ Args: engine (Union[Union[trt.ICudaEngine, trt.IExecutionContext], Callable() -> Union[trt.ICudaEngine, trt.IExecutionContext]]): A TensorRT engine or execution context or a callable that returns one. If an engine is provided, the runner will create a context automatically. name (str): The human-readable name prefix to use for this runner. A runner count and timestamp will be appended to this prefix. optimization_profile (int): The index of the optimization profile to set each time this runner is activated. When this is not provided, the profile is not set explicitly and will default to the 0th profile. You can also change the profile after the runner is active using the ``set_profile()`` method. allocation_strategy (str): The way device memory (internal activation and scratch memory) is allocated for the execution context. The value of this argument can be: - "static": The default value. The execution context will pre-allocate a block of memory that is sufficient for any possible input size across all profiles. - "profile": Allocate device memory enough for the current profile based on profile max shapes. - "runtime": Allocate device meomry enough for the current input shapes. weight_streaming_budget (int): The amount of GPU memory that TensorRT can use for weights at runtime. It can take on the following values: None or -2: Disables weight streaming at runtime. -1: TensorRT will decide the streaming budget automatically. >= 0: The maximum amount of GPU memory TensorRT is allowed to use for weights in bytes. weight_streaming_percent (float): The percentage of weights that TRT will keep on the GPU. It can take on the following values: None or 100%: Disables weight streaming at runtime. [0 to 100]: The percentage of weights TRT will stream. 0 will stream the maximum number of weights. """ super().__init__(name=name, prefix="trt-runner") self._engine_or_context = engine self.optimization_profile = optimization_profile self.allocation_strategy = allocation_strategy self.weight_streaming_budget = weight_streaming_budget self.weight_streaming_percent = weight_streaming_percent @util.check_called_by("activate") def activate_impl(self): engine_or_context, _ = util.invoke_if_callable(self._engine_or_context) if isinstance(engine_or_context, trt.ICudaEngine): self.engine = engine_or_context self._set_weight_streaming_budget() allocation_strategy = util.default(self.allocation_strategy, "static") if allocation_strategy == "static": self.context = self.engine.create_execution_context() elif allocation_strategy in ["profile", "runtime"]: # Device memory will be managed by polygraphy self.context = self.engine.create_execution_context(trt.ExecutionContextAllocationStrategy.USER_MANAGED) else: G_LOGGER.critical("Invalid allocation strategy specified.") if not self.context: G_LOGGER.critical("Invalid Context. See error log for details.") elif isinstance(engine_or_context, trt.IExecutionContext): self.context = engine_or_context self.engine = self.context.engine if self.allocation_strategy is not None: G_LOGGER.warning( "An allocation strategy was specified. Please ensure the provided execution context uses the same strategy." ) else: G_LOGGER.critical( "Invalid Engine or Context. Please ensure the engine was built correctly. See error log for details." ) self.device_input_buffers = OrderedDict() self.host_output_buffers = OrderedDict() self.stream = cuda.Stream() self.context_memory_buffer = None self.output_allocator = _make_output_allocator() if self.optimization_profile is not None: self.set_profile(self.optimization_profile)
[docs] def set_profile(self, index: int): """ Sets the active optimization profile for this runner. The runner must already be active (see ``__enter__()`` or ``activate()``). This only applies if your engine was built with multiple optimization profiles. In TensorRT 8.0 and newer, the profile will be set asynchronously using this runner's CUDA stream (``runner.stream``). By default, the runner uses the first profile (profile 0). Args: index (int): The index of the optimization profile to use. """ if not hasattr(self, "context") or self.context is None: G_LOGGER.critical(f"{self.name:35} | Must be activated prior to calling set_profile()") try: self.context.set_optimization_profile_async except AttributeError: self.context.active_optimization_profile = index else: if not self.context.set_optimization_profile_async(index, self.stream.ptr): G_LOGGER.critical(f"Failed to set optimization profile to: {index}")
@util.check_called_by("get_input_metadata") def get_input_metadata_impl(self): return trt_util.get_metadata_from_engine(self.engine, self.context, mode=trt.TensorIOMode.INPUT) def _infer_impl(self, feed_dict, copy_outputs_to_host): def get_io(mode): for idx in range(self.engine.num_io_tensors): name = self.engine.get_tensor_name(idx) if self.engine.get_tensor_mode(name) == mode: yield name use_torch = False for name in get_io(trt.TensorIOMode.INPUT): # Set up input tensor shapes and copy from host memory if needed array = feed_dict[name] if not isinstance(array, FormattedArray): array = FormattedArray(array, shape=util.array.shape(array)) underlying_array = array.array use_torch = use_torch or util.array.is_torch(underlying_array) ptr = None if self.engine.is_shape_inference_io(name): if not util.array.is_on_cpu(underlying_array): G_LOGGER.critical( f"A {type(underlying_array).__name__} was provided for input: {name}, but since this is a shape tensor, " "it must reside in host memory. " ) ptr = util.array.data_ptr(underlying_array) else: ptr = trt_util._get_array_on_gpu(underlying_array, name, self.device_input_buffers, self.stream) # If the format is HWC, make sure array.shape is considered after transposing back to CHW if trt_util.get_tensor_format(self.engine, self.context, name) == trt.TensorFormat.HWC: array_shape = trt_util.get_chw_shape_from_hwc(array.shape, self.context.get_tensor_strides(name)) else: array_shape = array.shape # Only update the input shape/address if something has changed. Otherwise, we'd be # doing extra work unnecessarily. # We retrieve the semantic shape from the FormattedArray, *not* the underlying array. if self.context.get_tensor_shape(name) != array_shape: G_LOGGER.ultra_verbose(f"Setting {name} input shape to: {array_shape}") if not self.context.set_input_shape(name, array_shape): G_LOGGER.critical(f"For input: {name}, failed to set shape to: {array_shape}") if self.context.get_tensor_address(name) != ptr: if not self.context.set_tensor_address(name, ptr): G_LOGGER.critical(f"For input: {name}, failed to set tensor address to: {ptr}") try: self.context.set_all_tensors_debug_state except AttributeError: pass else: # Set up the debug listener before running inference. debug_listener = _make_debug_listener() self.context.set_all_tensors_debug_state(True) if not self.context.set_debug_listener(debug_listener): G_LOGGER.critical(f"Failed to set debug listener.") # Set up the output allocator before running inference. self.output_allocator.set_use_torch(use_torch and torch.cuda.is_available()) for name in get_io(trt.TensorIOMode.OUTPUT): if not self.context.set_output_allocator(name, self.output_allocator): G_LOGGER.critical(f"For output: {name}, failed to set output allocator") if self.allocation_strategy in ["profile", "runtime"]: if self.allocation_strategy == "profile": # Perform per-profile allocation. size_to_allocate = 0 if mod.version(trt.__version__) >= mod.version("10.1"): size_to_allocate = self.engine.get_device_memory_size_for_profile_v2( self.context.active_optimization_profile ) else: size_to_allocate = self.engine.get_device_memory_size_for_profile( self.context.active_optimization_profile ) elif self.allocation_strategy == "runtime": # Perform runtime allocation. size_to_allocate = self.context.update_device_memory_size_for_shapes() if self.context_memory_buffer is None: self.context_memory_buffer = cuda.DeviceArray.raw((size_to_allocate,)) self.context_memory_buffer.resize((size_to_allocate,)) if mod.version(trt.__version__) >= mod.version("10.1"): self.context.set_device_memory(self.context_memory_buffer.ptr, self.context_memory_buffer.allocated_nbytes) else: self.context.device_memory = self.context_memory_buffer.ptr if not self.context.execute_async_v3(self.stream.ptr): G_LOGGER.critical("`execute_async_v3()` failed. Please see the logging output above for details.") output_buffers = OrderedDict() for name in get_io(trt.TensorIOMode.OUTPUT): # If we're dealing with vectorized formats, we need to return a FormattedArray. # Otherwise, we create a view instead with the correct shape/dtype. raw_array = self.output_allocator.buffers[name] shape = self.output_allocator.shapes[name] # If the format is HWC, make sure the result is shaped accordingly tensor_format = trt_util.get_tensor_format(self.engine, self.context, name) if tensor_format == trt.TensorFormat.HWC: shape = trt_util.get_hwc_shape_from_chw(shape, self.context.get_tensor_strides(name)) using_vectorized_format = tensor_format != trt.TensorFormat.LINEAR and tensor_format != trt.TensorFormat.HWC dtype = DataType.from_dtype(self.engine.get_tensor_dtype(name), source_module="tensorrt") # The memory allocated by the output allocator may be larger than actually required. # If we're using a vectorized format, then we need to copy the whole thing. # Otherwise, we can determine how much we actually need. nbytes = util.array.nbytes(raw_array) if using_vectorized_format else (util.volume(shape) * dtype.itemsize) if copy_outputs_to_host: raw_array = _get_array_on_cpu( raw_array, name, self.host_output_buffers, self.stream, nbytes, use_torch=use_torch, ) if using_vectorized_format: array = FormattedArray(raw_array, shape=shape) else: array = util.array.view(raw_array, dtype, shape) output_buffers[name] = array self.stream.synchronize() try: self.context.set_all_tensors_debug_state except AttributeError: pass else: if debug_listener.debug_tensor_outputs: output_buffers.update(debug_listener.debug_tensor_outputs) return output_buffers
[docs] @util.check_called_by("infer") def infer_impl(self, feed_dict, copy_outputs_to_host=None): """ Implementation for running inference with TensorRT. Do not call this method directly - use ``infer()`` instead, which will forward unrecognized arguments to this method. Args: feed_dict (OrderedDict[str, Union[numpy.ndarray, DeviceView, torch.Tensor]]): A mapping of input tensor names to corresponding input NumPy arrays, Polygraphy DeviceViews, or PyTorch tensors. If PyTorch tensors are provided in the feed_dict, then this function will return the outputs also as PyTorch tensors. If the provided inputs already reside in GPU memory, no additional copies are made. copy_outputs_to_host (bool): Whether to copy inference outputs back to host memory. If this is False, PyTorch GPU tensors or Polygraphy DeviceViews are returned instead of PyTorch CPU tensors or NumPy arrays respectively. Defaults to True. Returns: OrderedDict[str, Union[numpy.ndarray, DeviceView, torch.Tensor]]: A mapping of output tensor names to corresponding output NumPy arrays, Polygraphy DeviceViews, or PyTorch tensors. """ copy_outputs_to_host = util.default(copy_outputs_to_host, True) start = time.time() output_buffers = self._infer_impl(feed_dict, copy_outputs_to_host) end = time.time() self.inference_time = end - start return output_buffers
@util.check_called_by("deactivate") def deactivate_impl(self): [buf.free() for buf in self.device_input_buffers.values()] if self.context_memory_buffer is not None: self.context_memory_buffer.free() self.stream.free() del ( self.engine, self.context, self.device_input_buffers, self.host_output_buffers, self.stream, self.context_memory_buffer, self.output_allocator, ) def _set_weight_streaming_budget(self): # Setup weight streaming if applicable if self.weight_streaming_budget != None and self.weight_streaming_percent != None: G_LOGGER.warning(f"Cannot specify the weight streaming budget both in bytes and percentage. Prioritizing the bytes value.") if self.weight_streaming_budget is not None: assert self.weight_streaming_budget == -2 or self.weight_streaming_budget == -1 or self.weight_streaming_budget >= 0 if mod.version(trt.__version__) >= mod.version("10.1"): self._set_weight_streaming_budget_v2() else: self._set_weight_streaming_budget_v1() def _set_weight_streaming_budget_v1(self): budget_bytes = None if self.weight_streaming_budget is not None: if self.weight_streaming_budget == -2: budget_bytes = 0 else: budget_bytes = self.weight_streaming_budget elif self.weight_streaming_percent is not None: assert 0 <= self.weight_streaming_percent <= 100 if self.weight_streaming_percent == 0: budget_bytes = 0 # Disable weight streaming else: min_budget = self.engine.minimum_weight_streaming_budget max_budget = self.engine.streamable_weights_size budget_bytes = (1 - self.weight_streaming_percent / 100.0) * (max_budget - min_budget) + min_budget if budget_bytes is not None: budget_bytes = int(budget_bytes) self.engine.weight_streaming_budget = budget_bytes if self.engine.weight_streaming_budget != budget_bytes: G_LOGGER.critical(f"Failed to set weight streaming budget to {budget_bytes}!") if budget_bytes == 0: G_LOGGER.info(f"Weight streaming is disabled.") elif budget_bytes == -1: G_LOGGER.info(f"Weight streaming is enabled with TensorRT automatically determiing the budget.") else: G_LOGGER.info(f"Weight streaming is enabled with a memory budget of {budget_bytes} bytes.") def _set_weight_streaming_budget_v2(self): budget_bytes = None if self.weight_streaming_budget is not None: # use V2 path assert self.weight_streaming_budget == -2 or self.weight_streaming_budget == -1 or self.weight_streaming_budget >= 0 if self.weight_streaming_budget == -2: budget_bytes = self.engine.streamable_weights_size elif self.weight_streaming_budget == -1: budget_bytes = self.engine.get_weight_streaming_automatic_budget() else: budget_bytes = self.weight_streaming_budget elif self.weight_streaming_percent is not None: assert 0 <= self.weight_streaming_percent <= 100 if self.weight_streaming_percent == 100: budget_bytes = self.engine.streamable_weights_size else: budget_bytes = self.weight_streaming_percent / 100.0 * (self.engine.streamable_weights_size) if budget_bytes is not None: budget_bytes = int(budget_bytes) self.engine.weight_streaming_budget_v2 = budget_bytes if self.engine.weight_streaming_budget_v2 != budget_bytes: G_LOGGER.critical(f"Failed to set weight streaming budget to {budget_bytes}!") if budget_bytes == self.engine.streamable_weights_size: G_LOGGER.info(f"Weight streaming is disabled.") else: G_LOGGER.info(f"Weight streaming is enabled with a memory budget of {budget_bytes} bytes.")