#
# SPDX-FileCopyrightText: Copyright (c) 1993-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import tensorflow as tf
from collections import deque
from typing import List
import os
import shutil
from tf2onnx import tf_loader, utils, convert
import copy
[docs]def ensure_and_clean_dir(dir_path, do_clean_dir=True) -> None:
"""Create a directory to save test logs
Args:
dir_path (str): directory to create / clean.
do_clean_dir (bool): boolean indicating whether to clean the directory if it already exists (remove+create).
Returns:
None
"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif do_clean_dir:
shutil.rmtree(dir_path)
os.makedirs(dir_path)
[docs]class Folder:
"""
Folder class that tracks all files for a single experiment.
"""
def __init__(self, folder_name) -> None:
self.base = folder_name
ensure_and_clean_dir(self.base)
self.fp32 = os.path.join(self.base, "fp32")
ensure_and_clean_dir(self.fp32)
self.fp32_saved_model = os.path.join(
self.fp32, "saved_model"
) # location of fp32 saved keras model
self.fp32_onnx_model = os.path.join(
self.fp32, "original.onnx"
) # location of fp32 onnx model
self.int8 = os.path.join(self.base, "int8")
ensure_and_clean_dir(self.int8)
self.int8_saved_model = os.path.join(
self.int8, "saved_model"
) # location of int8 saved keras model
self.int8_onnx_model = os.path.join(
self.int8, "quantized.onnx"
) # location of int8 onnx model
[docs]class CreateAssetsFolders:
"""Create empty folders to save the original and quantized TensorFlow models and their respective ONNX
models for each experiment.
The following directory structure is created: base_directory -> experiment_directory (created by `add_folder` method) -> (fp32 [saved_model, .onnx model]),
(int8 [saved_model, .onnx model]).
"""
def __init__(self, base_experiment_directory) -> None:
self.base = base_experiment_directory
if not os.path.exists(self.base):
os.mkdir(self.base)
[docs] def add_folder(self, folder_name: str) -> None:
"""
Create the experiment directory (sub-folder in the base directory passed to this class).
Args:
folder_name (str): name of folder
Returns:
None
"""
setattr(self, folder_name, Folder(os.path.join(self.base, folder_name)))
[docs]def convert_saved_model_to_onnx(
saved_model_dir: str, onnx_model_path: str, opset=13
) -> None:
"""Convert Keras saved model into ONNX format.
Works directly with CreateAssetsFolder object path.
Args:
saved_model_dir (str): Path to keras saved model.
onnx_model_path (str): Full path to ONNX model file.
Returns:
None
"""
# 1. Let TensorRT optimize QDQ nodes instead of TF
from tf2onnx.optimizer import _optimizers
updated_optimizers = copy.deepcopy(_optimizers)
del updated_optimizers["q_dq_optimizer"]
del updated_optimizers["const_dequantize_optimizer"]
# 2. Extract graph definition from SavedModel
graph_def, inputs, outputs = tf_loader.from_saved_model(
model_path=saved_model_dir,
input_names=None,
output_names=None,
tag="serve",
signatures=["serving_default"],
)
# 3. Convert tf2onnx and save onnx file
model_proto, _ = convert._convert_common(
graph_def,
opset=opset,
input_names=inputs,
output_names=outputs,
output_path=onnx_model_path,
optimizers=updated_optimizers,
)
utils.save_protobuf(onnx_model_path, model_proto)
print("ONNX conversion Done!")
[docs]def convert_keras_model_to_onnx(
keras_model: tf.keras.Model, onnx_model_path: str, opset=13
) -> None:
"""Convert in-memory Keras model into ONNX format.
Works directly with CreateAssetsFolder object path.
Args:
keras_model (tf.keras.Model): Keras model.
onnx_model_path (str): Full path to ONNX model file.
Returns:
None
"""
# 1. Let TensorRT optimize QDQ nodes instead of TF
from tf2onnx.optimizer import _optimizers
updated_optimizers = copy.deepcopy(_optimizers)
del updated_optimizers["q_dq_optimizer"]
del updated_optimizers["const_dequantize_optimizer"]
# 2. Convert keras model directly and save onnx file.
onnx_model_proto, _ = convert.from_keras(keras_model, opset=opset, optimizers=updated_optimizers)
utils.save_protobuf(onnx_model_path, onnx_model_proto)
[docs]class KerasModelTraveller:
"""
Utility class to travel Keras model and print out detailed layer information.
"""
def __init__(self, print_layer_config=False) -> None:
self._pc = print_layer_config
self.model_list = deque([])
# Used to filter which classes you want printed, by layer.__class__
self._filter_by_class = None
self._layer_names = []
self._print_basic_info = None
def _print_layer_info(self, layer):
assert isinstance(layer, tf.keras.layers.Layer)
if self._filter_by_class is None or layer.__class__ in self._filter_by_class:
self._layer_names.append(layer.name)
if self._print_basic_info:
print(
"layer name:{layer_name}, layer class:{layer_class}".format(
layer_name=layer.name, layer_class=layer.__class__
)
)
if self._pc:
print(layer.get_config())
if self._print_basic_info:
print("-----------------")
def _dissect(self):
if not self.model_list:
return
number_of_models = len(self.model_list)
for _ in range(number_of_models):
# Get a subclassed model
current_model = self.model_list.pop()
print("Keras Subclassed Model: {}".format(current_model.__class__.__name__))
assert isinstance(current_model, tf.keras.Model)
for l in current_model.layers:
if isinstance(l, tf.keras.Model):
# This is another subclassed model inside
# Add this model to model queue for further analysis
self.model_list.appendleft(l)
self._dissect()
else:
# This is a layer
self._print_layer_info(l)
def _travel(
self, keras_model: tf.keras.Model, filter_by_class=None, print_basic_info=False
):
"""Gets layer info by dissecting the model (need for multi-layered models)
Args:
keras_model (tf.keras.Model): Keras model
filter_by_class (str): None or array of layer.__class__ to print
Returns:
None
"""
self.filter_by_class = filter_by_class
self._print_basic_info = print_basic_info
assert isinstance(
keras_model, tf.keras.Model
), "Model passed is not Keras model"
self.model_list.appendleft(keras_model)
self._dissect()
self.filter_by_class = None
[docs] def get_layer_names(self, keras_model: tf.keras.Model, filter_by_class=None):
"""Get name of all layers in the model.
Args:
keras_model (tf.keras.Model): Keras model
filter_by_class (str): None or array of layer.__class__ to print
Returns:
None
"""
self._travel(keras_model=keras_model, filter_by_class=filter_by_class)
return self._layer_names
def _get_layer_info(layer: tf.keras.layers.Layer) -> dict:
"""
Returns the layer's class, module, and name
"""
return {
"class": layer.__class__.__name__,
"module": layer.__class__.__module__,
"name": layer.name,
"layer": layer,
}
def _get_previous_layers_class_and_module_and_name(
layer: tf.keras.layers.Layer,
) -> List[dict]:
"""
For a given layer return a dictionary with name, module and class information of all previous layers.
"""
r = []
if isinstance(layer.input, list):
for layer_input_tensor in layer.input:
ip_tensor_parent_layer = layer_input_tensor._keras_history.layer
r.append(_get_layer_info(ip_tensor_parent_layer))
else:
ip_tensor_parent_layer = layer.input._keras_history.layer
r.append(_get_layer_info(ip_tensor_parent_layer))
return r
[docs]def find_my_predecessors(model: tf.keras.Model, current_layer_name: str) -> List[dict]:
"""
Given a layer name, find all predecessors of that layer.
Args:
model (tf.keras.Model): Keras functional model
current_layer_name (str): name of a model layer for which predecessors has to be found.
Returns:
List[dict]: List of predecessors. Each dictionary has three keys as follows,
::
{'class':<pred_layer_class>, 'module':<pred_layer_module>, 'name':<pred_layer_name>}
Raises:
AssertionError: If model is subclassed or current_layer_name is not string.
"""
supported_model_classes = {"Functional", "Sequential"}
assert isinstance(current_layer_name, str), "current layer name should be passed."
assert (
model.__class__.__name__ in supported_model_classes
), "model should be Functional or Sequential."
for layer in model.layers:
if layer.name == current_layer_name:
return _get_previous_layers_class_and_module_and_name(layer)
[docs]def find_my_successors(model: tf.keras.Model, current_layer_name: str) -> List[dict]:
"""
Given a layer name, find all successors of that layer.
Args:
model (tf.keras.Model): Keras functional model
current_layer_name (str): name of a model layer for which successors has to be found.
Returns:
List[dict]: List of predecessors. Each dictionary has three keys as follows,
::
{'class':<pred_layer_class>, 'module':<pred_layer_module>, 'name':<pred_layer_name>}
Raises:
AssertionError: If model is subclassed or current_layer_name is not string.
"""
supported_model_classes = {"Functional", "Sequential"}
assert isinstance(current_layer_name, str), "current layer name should be passed."
assert (
model.__class__.__name__ in supported_model_classes
), "model should be Functional or Sequential."
def _check_all_next_layers_with_connection_to_current(
next_layers: List[tf.keras.layers.Layer],
current_layer_name: str,
current_layer_class: str,
):
successors = []
for layer in next_layers:
p_layers = _get_previous_layers_class_and_module_and_name(layer)
for p_layer in p_layers:
if (
p_layer["class"] == current_layer_class
and p_layer["name"] == current_layer_name
):
successors.append(_get_layer_info(layer))
return successors
all_layers = model.layers
for i, layer in enumerate(all_layers):
if layer.name == current_layer_name:
next_layers = all_layers[i + 1 :]
layer_info = _get_layer_info(layer)
return _check_all_next_layers_with_connection_to_current(
next_layers, layer_info["name"], layer_info["class"]
)