Source code for polygraphy.backend.trt.util

#
# SPDX-FileCopyrightText: Copyright (c) 1993-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import contextlib
import json

from polygraphy import config, mod, util
from polygraphy.common import TensorMetadata
from polygraphy.logger import G_LOGGER, LogMode

trt = mod.lazy_import("tensorrt")
np = mod.lazy_import("numpy")


TRT_LOGGER = None


[docs]@mod.export() def get_trt_logger(): """ Get the global TensorRT logger created by Polygraphy. Returns: trt.Logger: The TensorRT logger. """ global TRT_LOGGER if TRT_LOGGER is None: TRT_LOGGER = trt.Logger() return TRT_LOGGER
def fail_unavailable(what): G_LOGGER.backtrace() G_LOGGER.critical(f"{what} is not available on TensorRT version {trt.__version__}.") def check_onnx_parser_errors(parser, success): if parser.num_errors > 0: for index in range(parser.num_errors): G_LOGGER.error(parser.get_error(index)) G_LOGGER.critical("Could not parse ONNX correctly") if not success: G_LOGGER.critical("Failed to parse ONNX model. Does the model file exist and contain a valid ONNX model?") def get_layer_class_mapping(): layer_class_mapping = {} def try_add(layer_type, layer_cls): try: layer_type = getattr(trt.LayerType, layer_type) layer_cls = getattr(trt, layer_cls) except AttributeError: if config.INTERNAL_CORRECTNESS_CHECKS: G_LOGGER.warning(f"Could not find layer type: {layer_type} or layer class: {layer_cls}") else: layer_class_mapping[layer_type] = layer_cls try_add("CONVOLUTION", "IConvolutionLayer") try_add("FULLY_CONNECTED", "IFullyConnectedLayer") try_add("ACTIVATION", "IActivationLayer") try_add("POOLING", "IPoolingLayer") try_add("LRN", "ILRNLayer") try_add("SCALE", "IScaleLayer") try_add("SOFTMAX", "ISoftMaxLayer") try_add("DECONVOLUTION", "IDeconvolutionLayer") try_add("CONCATENATION", "IConcatenationLayer") try_add("ELEMENTWISE", "IElementWiseLayer") try_add("PLUGIN", "IPluginLayer") try_add("UNARY", "IUnaryLayer") try_add("PADDING", "IPaddingLayer") try_add("SHUFFLE", "IShuffleLayer") try_add("REDUCE", "IReduceLayer") try_add("TOPK", "ITopKLayer") try_add("GATHER", "IGatherLayer") try_add("MATRIX_MULTIPLY", "IMatrixMultiplyLayer") try_add("RAGGED_SOFTMAX", "IRaggedSoftMaxLayer") try_add("CONSTANT", "IConstantLayer") try_add("RNN", "IRNNLayer") try_add("RNN_V2", "IRNNv2Layer") try_add("IDENTITY", "IIdentityLayer") try_add("PLUGIN_V2", "IPluginV2Layer") try_add("SLICE", "ISliceLayer") try_add("SHAPE", "IShapeLayer") try_add("PARAMETRIC_RELU", "IParametricReLULayer") try_add("RESIZE", "IResizeLayer") try_add("TRIP_LIMIT", "ITripLimitLayer") try_add("RECURRENCE", "IRecurrenceLayer") try_add("ITERATOR", "IIteratorLayer") try_add("LOOP_OUTPUT", "ILoopOutputLayer") try_add("SELECT", "ISelectLayer") try_add("FILL", "IFillLayer") try_add("QUANTIZE", "IQuantizeLayer") try_add("DEQUANTIZE", "IDequantizeLayer") try_add("CONDITION", "IConditionLayer") try_add("CONDITIONAL_INPUT", "IIfConditionalInputLayer") try_add("CONDITIONAL_OUTPUT", "IIfConditionalOutputLayer") try_add("ASSERTION", "IAssertionLayer") try_add("SCATTER", "IScatterLayer") try_add("EINSUM", "IEinsumLayer") return layer_class_mapping def np_dtype_from_trt(trt_dtype): # trt.nptype uses NumPy, so to make autoinstall work, we need to trigger it before that. mod.autoinstall(np) return np.dtype(trt.nptype(trt_dtype)) def get_network_input_names_meta(network): names = [] meta = TensorMetadata() for i in range(network.num_inputs): tensor = network.get_input(i) names.append(tensor.name) meta.add(name=tensor.name, dtype=np_dtype_from_trt(tensor.dtype), shape=tensor.shape) return names, meta def get_network_output_names_meta(network): names = [] meta = TensorMetadata() for i in range(network.num_outputs): tensor = network.get_output(i) names.append(tensor.name) meta.add(name=tensor.name, dtype=np_dtype_from_trt(tensor.dtype), shape=tensor.shape) return names, meta def get_layer_input_names_meta(layer): names = [] meta = TensorMetadata() for i in range(layer.num_inputs): inp = layer.get_input(i) if inp: names.append(inp.name) meta.add(inp.name, np_dtype_from_trt(inp.dtype), inp.shape) return names, meta def get_layer_output_names_meta(layer): names = [] meta = TensorMetadata() for i in range(layer.num_outputs): out = layer.get_output(i) if out: names.append(out.name) meta.add(out.name, np_dtype_from_trt(out.dtype), out.shape) return names, meta def str_from_layer(layer, index): input_names, input_meta = get_layer_input_names_meta(layer) output_names, output_meta = get_layer_output_names_meta(layer) return util.str_from_layer( "Layer", index, layer.name, layer.type, input_names, input_meta, output_names, output_meta ) def get_layer_attribute_names(layer): def is_special_attribute(attr): return attr.startswith("__") and attr.endswith("__") def is_valid_attribute(attr, layer): if ( type(layer) == trt.IPoolingLayer or type(layer) == trt.IConvolutionLayer or type(layer) == trt.IDeconvolutionLayer ): if len(layer.get_input(0).shape) > 4: # 3D pooling uses padding_nd return attr not in ["padding", "stride", "window_size"] if type(layer) == trt.IResizeLayer: if layer.num_inputs > 1: return attr not in ["scales"] if type(layer) == trt.ISliceLayer: if layer.num_inputs > 1: return attr not in ["shape", "start", "stride"] return True return [ attr for attr in dir(layer) if not is_special_attribute(attr) and not hasattr(trt.ILayer, attr) and is_valid_attribute(attr, layer) ] def str_from_network(network, show_layers=None, show_attrs=None, show_weights=None): """ Converts a TensorRT network to a human-readable representation Args: network (trt.INetworkDefinition): The network. show_layers (bool): Whether to display per-layer information. show_attrs (bool): Whether to display per-layer attributes. show_weights (bool): Whether to display the value of weights. Returns: str """ show_layers = util.default(show_layers, False) show_attrs = util.default(show_attrs, False) show_weights = util.default(show_weights, False) LAYER_TYPE_CLASS_MAPPING = get_layer_class_mapping() network_str = f"Name: {network.name} | {'Implicit' if hasattr(network, 'has_implicit_batch_dimension') and network.has_implicit_batch_dimension else 'Explicit'} Batch Network{' with Explicit Precision ' if hasattr(network, 'has_explicit_precision') and network.has_explicit_precision else ''}\n" network_str += "\n" _, input_metadata = get_network_input_names_meta(network) network_str += f"---- {len(input_metadata)} Network Input(s) ----\n{input_metadata}\n\n" _, output_metadata = get_network_output_names_meta(network) network_str += f"---- {len(output_metadata)} Network Output(s) ----\n{output_metadata}\n\n" network_str += f"---- {network.num_layers} Layer(s) ----\n" if show_layers: for index, layer in enumerate(network): if layer.type in LAYER_TYPE_CLASS_MAPPING: layer.__class__ = LAYER_TYPE_CLASS_MAPPING[layer.type] network_str += str_from_layer(layer, index) if show_attrs: # Exclude special attributes, as well as any attributes of the base layer class (those can be displayed above). attrs = get_layer_attribute_names(layer) if attrs: network_str += util.indent_block("---- Attributes ----") + "\n" for attr in attrs: with G_LOGGER.verbosity(): val = getattr(layer, attr) if show_weights or not isinstance(val, np.ndarray): attr_str = "" if layer.name: attr_str += f"{layer.name}." network_str += util.indent_block(f"{attr_str}{attr} = {val}") + "\n" network_str += "\n" return util.indent_block(network_str, level=0) def _get_network_outputs(network): return [network.get_output(index).name for index in range(network.num_outputs)] def check_outputs_not_found(not_found, available_outputs): if not_found: available_outputs = util.unique_list(available_outputs) sep = "\n\t" G_LOGGER.critical( f"The following outputs were not found: {not_found}.\n" f"Note: Available tensors:{sep}{sep.join(available_outputs)}" ) def mark_outputs(network, outputs): """ Mark the specified outputs as network outputs. Args: network (trt.INetworkDefinition): The network in which to mark outputs. outputs (Sequence[str]): The names of tensors to mark as outputs. """ outputs = set(outputs) all_outputs = [] for layer in network: for index in range(layer.num_outputs): tensor = layer.get_output(index) all_outputs.append(tensor.name) # Clear all old outputs if tensor.is_network_output: network.unmark_output(tensor) if tensor.name in outputs: if not tensor.is_network_output: G_LOGGER.ultra_verbose(f"Marking {tensor.name} as an output") network.mark_output(tensor) marked_outputs = set(_get_network_outputs(network)) not_found = outputs - marked_outputs check_outputs_not_found(not_found, all_outputs) def mark_layerwise(network): # Layers within loops cannot be marked as network outputs. LOOP_START_NAMES = ["TRIP_LIMIT", "ITERATOR", "RECURRENCE"] LOOP_END_NAMES = ["LOOP_OUTPUT"] LOOP_START_LAYERS = [getattr(trt.LayerType, attr) for attr in LOOP_START_NAMES if hasattr(trt.LayerType, attr)] LOOP_END_LAYERS = [getattr(trt.LayerType, attr) for attr in LOOP_END_NAMES if hasattr(trt.LayerType, attr)] EXCLUDE_LAYERS = [trt.LayerType.SHAPE, trt.LayerType.CONSTANT] outputs = [] in_loop = False for layer in network: if layer.type in LOOP_START_LAYERS: G_LOGGER.warning( "Loop detected. Please ensure the network is topologically sorted so that layers within " "the loop body are not marked as network outputs in layerwise mode", mode=LogMode.ONCE, ) in_loop = True elif layer.type in LOOP_END_LAYERS: in_loop = False should_mark_layer = not in_loop and layer.type not in EXCLUDE_LAYERS if should_mark_layer: for index in range(layer.num_outputs): tensor = layer.get_output(index) outputs.append(tensor.name) G_LOGGER.verbose(f"Marking {len(outputs)} tensors as outputs") mark_outputs(network, outputs) def unmark_outputs(network, outputs): outputs = set(outputs) unmarked_outputs = set() for layer in network: for index in range(layer.num_outputs): tensor = layer.get_output(index) if tensor.is_network_output and tensor.name in outputs: network.unmark_output(tensor) unmarked_outputs.add(tensor.name) not_found = outputs - unmarked_outputs check_outputs_not_found(not_found, _get_network_outputs(network)) def str_from_config(config): config_str = ( f"{'Workspace':20} | {config.max_workspace_size} bytes ({config.max_workspace_size / 1024.0 ** 2:.2f} MiB)" ) config_str += f"\n{'Precision':20} | " with contextlib.suppress(AttributeError): config_str += f"TF32: {config.get_flag(trt.BuilderFlag.TF32)}, " config_str += f"FP16: {config.get_flag(trt.BuilderFlag.FP16)}, INT8: {config.get_flag(trt.BuilderFlag.INT8)}, " with contextlib.suppress(AttributeError): config_str += f"Obey Precision Constraints: {config.get_flag(trt.BuilderFlag.OBEY_PRECISION_CONSTRAINTS)}, " config_str += f"Strict Types: {config.get_flag(trt.BuilderFlag.STRICT_TYPES)}" with contextlib.suppress(AttributeError): source_vals = [ val.name for val in trt.TacticSource.__members__.values() if (1 << int(val)) & config.get_tactic_sources() ] config_str += f"\n{'Tactic Sources':20} | {source_vals}" with contextlib.suppress(AttributeError): config_str += f"\n{'Safety Restricted':20} | {config.get_flag(trt.BuilderFlag.SAFETY_SCOPE)}" if config.default_device_type == trt.DeviceType.DLA: config_str += ( f"\n{'DLA':20} | Core: {config.DLA_core}, GPU Fallback: {config.get_flag(trt.BuilderFlag.GPU_FALLBACK)}" ) if config.int8_calibrator: config_str += f"\n{'Calibrator':20} | {config.int8_calibrator}" config_str += f"\n{'Profiles':20} | {config.num_optimization_profiles} profile(s)" return config_str def check_profile(profile): if not bool(profile): G_LOGGER.critical(f"Profile is not valid, please provide profile data.\nNote: profile was: {profile}") return profile def str_from_tensor(tensor, is_shape_tensor): ret = "Input " if is_shape_tensor: ret += "shape-tensor" else: ret += "tensor" ret += f": {tensor.name} (dtype={tensor.dtype}, shape={tensor.shape})" return ret def get_input_metadata_from_profile(profile, network): """ Returns metadata about the inputs based on the OPT values set in a profile. Args: profile (trt.IOptimizationProfile): The profile from which to retrieve input metada. network (trt.INetworkDefinition): The network the profile applies to. Returns: TensorMetadata: A mapping of input names to their types and shapes. Shapes are retrieved from the OPT values in the profile. """ input_metadata = TensorMetadata() for index in range(network.num_inputs): tensor = network.get_input(index) if tensor.is_shape_tensor: shapes = profile.get_shape_input(tensor.name) else: shapes = profile.get_shape(tensor.name) if tuple(shapes[0]) != tuple(shapes[2]): G_LOGGER.warning( "Will use `opt` shapes from profile 0 for calibration. " "Note that even though `min` != `max` in this profile, calibration " "will use fixed input shapes (this is not necessarily an issue)." ) # Always use opt shape input_metadata.add(name=tensor.name, dtype=np_dtype_from_trt(tensor.dtype), shape=shapes[1]) return input_metadata def add_binding_to_metadata(engine, binding, metadata, name_binding): # name_binding always comes from profile 0, since that's where we # get all binding names in the runner metadata.add( name=engine[name_binding], dtype=np_dtype_from_trt(engine.get_binding_dtype(binding)), shape=list(engine.get_binding_shape(binding)), ) def get_input_metadata_from_engine(engine, start_binding, end_binding): inputs = TensorMetadata() for index, binding in enumerate(range(start_binding, end_binding)): if engine.binding_is_input(binding): add_binding_to_metadata(engine, binding, inputs, name_binding=index) return inputs def get_output_metadata_from_engine(engine, start_binding, end_binding): outputs = TensorMetadata() for index, binding in enumerate(range(start_binding, end_binding)): if not engine.binding_is_input(binding): add_binding_to_metadata(engine, binding, outputs, name_binding=index) return outputs def str_from_engine(engine, show_layers=None, show_attrs=None): show_layers = util.default(show_layers, False) show_attrs = util.default(show_attrs, False) bindings_per_profile = get_bindings_per_profile(engine) engine_str = f"Name: {engine.name} | {'Refittable ' if engine.refittable else ''}{'Implicit' if hasattr(engine, 'has_implicit_batch_dimension') and engine.has_implicit_batch_dimension else 'Explicit'} Batch Engine\n" engine_str += "\n" # Show metadata for the first profile (i.e. the dynamic shapes) input_metadata = get_input_metadata_from_engine(engine, 0, bindings_per_profile) engine_str += f"---- {len(input_metadata)} Engine Input(s) ----\n{input_metadata}\n\n" output_metadata = get_output_metadata_from_engine(engine, 0, bindings_per_profile) engine_str += f"---- {len(output_metadata)} Engine Output(s) ----\n{output_metadata}\n\n" engine_str += f"---- Memory ----\nDevice Memory: {engine.device_memory_size} bytes\n\n" engine_str += f"---- {engine.num_optimization_profiles} Profile(s) ({bindings_per_profile} Binding(s) Each) ----\n" for profile_index in range(engine.num_optimization_profiles): engine_str += f"- Profile: {profile_index}\n" max_width = max([len(binding) for binding in engine]) + 8 for offset in range(bindings_per_profile): binding = profile_index * bindings_per_profile + offset name = f"[Name: {engine.get_binding_name(binding)}]" binding_type = "(Input) " if engine.binding_is_input(binding) else "(Output)" engine_str += util.indent_block(f"Binding Index: {binding} {binding_type} {name:<{max_width}}") if engine.binding_is_input(binding): if engine.is_shape_binding(binding): min_shape, opt_shape, max_shape = engine.get_profile_shape_input(profile_index, binding) else: min_shape, opt_shape, max_shape = engine.get_profile_shape(profile_index, binding) engine_str += f" | Shapes: min={min_shape}, opt={opt_shape}, max={max_shape}\n" else: engine_str += f" | Shape: {engine.get_binding_shape(binding)}\n" engine_str += "\n" layers_per_profile = engine.num_layers // engine.num_optimization_profiles engine_str += ( f"---- {layers_per_profile} Layer(s){' Per Profile' if engine.num_optimization_profiles > 1 else ''} ----\n" ) if show_layers: try: inspector = engine.create_engine_inspector() except AttributeError: G_LOGGER.warning( f"Cannot show layer information because IEngineInspector is not available in this version of TensorRT ({trt.__version__})" ) else: for profile_idx in range(engine.num_optimization_profiles): indent_level = 0 if engine.num_optimization_profiles >= 1: indent_level = 1 engine_str += f"- Profile: {profile_idx}\n" offset = profile_idx * layers_per_profile for index in range(layers_per_profile): layer_info = json.loads( inspector.get_layer_information(offset + index, trt.LayerInformationFormat.JSON) ) op = "Unknown" input_names, input_meta = [], TensorMetadata() output_names, output_meta = [], TensorMetadata() origin = "Unknown" tactic = "Unknown" if engine.profiling_verbosity == trt.ProfilingVerbosity.DETAILED: name = layer_info.get("Name", "Unknown") op = layer_info.get("LayerType", "Unknown") def names_meta_from_inspector(key): names = [] meta = TensorMetadata() info = layer_info.get(key) if info is None: return meta for elem in info: names.append(elem["Name"]) meta.add(name=elem["Name"], dtype=None, shape=elem["Dimensions"]) return names, meta input_names, input_meta = names_meta_from_inspector("Inputs") output_names, output_meta = names_meta_from_inspector("Outputs") origin = layer_info.get("Origin", "Unknown") tactic = layer_info.get("TacticValue", "Unknown") else: G_LOGGER.warning( f"This engine was created with a profiling verbosity of: {engine.profiling_verbosity}. Some layer information may be missing. Try setting a higher profiling verbosity to see more detailed layer information. ", mode=LogMode.ONCE, ) name = layer_info engine_str += ( util.indent_block( util.str_from_layer( "Layer", index, name, op, input_names, input_meta, output_names, output_meta ), indent_level, ) + "\n" ) if show_attrs: engine_str += util.indent_block("---- Attributes ----", indent_level + 1) + "\n" engine_str += util.indent_block(f"Origin = {origin}", indent_level + 1) + "\n" engine_str += util.indent_block(f"Tactic = {tactic}", indent_level + 1) + "\n" engine_str += "\n" return util.indent_block(engine_str, level=0) def get_bindings_per_profile(engine): return engine.num_bindings // engine.num_optimization_profiles def get_active_profile_bindings(context): """ Gets the start and end binding indices for the active optimization profile. Args: engine (trt.ICudaEngine): The engine in question. context (trt.IExecutionContext): The context where the profile is currently set. Returns: Tuple[int, int]: The start and end bindings indices, in that order """ active_profile = context.active_optimization_profile bindings_per_profile = get_bindings_per_profile(context.engine) start_binding = bindings_per_profile * active_profile end_binding = start_binding + bindings_per_profile G_LOGGER.ultra_verbose( f"Total # of Profiles: {context.engine.num_optimization_profiles}, Bindings Per Profile: {bindings_per_profile}, Active Profile: {active_profile}, Start Binding: {start_binding}, End Binding: {end_binding}" ) return start_binding, end_binding