# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import typing
from pathlib import Path
import numpy as np
import torch
from nemo_export.tarutils import TarPath
from nemo_export_deploy_common.import_utils import MISSING_PIL_MSG, MISSING_TRITON_MSG, UnavailableError
try:
from PIL import Image
HAVE_PIL = True
except (ImportError, ModuleNotFoundError):
HAVE_PIL = False
from typing import Any
from unittest.mock import MagicMock
Image = MagicMock()
Image.Image = Any
try:
from pytriton.model_config import Tensor
HAVE_TRITON = True
except (ImportError, ModuleNotFoundError):
HAVE_TRITON = False
NEMO2 = "NEMO 2.0"
NEMO1 = "NEMO 1.0"
[docs]
def typedict2tensor(
typedict_class,
overwrite_kwargs: typing.Optional[typing.Dict[str, typing.Any]] = None,
defaults: typing.Optional[typing.Dict[str, typing.Any]] = None,
):
"""Converts a type dictionary class into a tuple of PyTriton Tensor objects.
This function takes a class with type hints and converts each typed field into a PyTriton
Tensor specification, handling nested list types and mapping Python types to numpy dtypes.
Args:
typedict_class: A class with type hints that will be converted to Tensor specs
overwrite_kwargs: Optional dictionary of kwargs to override default Tensor parameters
defaults: Optional dictionary of default values (unused)
Returns:
tuple: A tuple of PyTriton Tensor objects, one for each typed field in the input class
Raises:
Exception: If an unsupported type is encountered during type mapping
"""
if not HAVE_TRITON:
raise UnavailableError(MISSING_TRITON_MSG)
def _map_type(type_):
if type_ is int:
return np.int32
elif type_ is float:
return np.float32
elif type_ is bool:
return np.bool_
elif type_ is str:
return bytes
else:
raise Exception(f"Unknown type {type_}")
def _get_tensor_params(type_):
count = 0
while typing.get_origin(type_) is list:
type_ = typing.get_args(type_)[0]
count += 1
count -= 1 # we don't want to count the last dimension
shape = (-1,) * count if count > 1 else (1,)
return {"shape": shape, "dtype": _map_type(type_)}
overwrite_kwargs = overwrite_kwargs or {}
return tuple(
Tensor(name=name, **_get_tensor_params(type_), **overwrite_kwargs)
for name, type_ in typing.get_type_hints(typedict_class).items()
)
[docs]
def nemo_checkpoint_version(path: str) -> str:
"""Determines the version of a NeMo checkpoint from its file structure.
Examines the provided checkpoint path to determine if it follows the NeMo 2.0
or NeMo 1.0 format based on the presence of 'context' and 'weights' directories.
Args:
path (str): Path to the NeMo checkpoint file or directory
Returns:
str: Version string - either NEMO2 or NEMO1 constant indicating the checkpoint version
"""
if os.path.isdir(path):
path = Path(path)
else:
path = TarPath(path)
if (path / "context").exists() and (path / "weights").exists():
return NEMO2
else:
return NEMO1
[docs]
def str_list2numpy(str_list: typing.List[str]) -> np.ndarray:
"""Converts a list of strings to a numpy array of UTF-8 encoded bytes.
Takes a list of strings and converts it to a numpy array with an additional
dimension, then encodes the strings as UTF-8 bytes.
Args:
str_list (List[str]): List of strings to convert
Returns:
np.ndarray: Numpy array of UTF-8 encoded bytes with shape (N, 1) where N is
the length of the input list
"""
str_ndarray = np.array(str_list)[..., np.newaxis]
return np.char.encode(str_ndarray, "utf-8")
[docs]
def str_ndarray2list(str_ndarray: np.ndarray) -> typing.List[str]:
"""Converts a numpy array of UTF-8 encoded bytes back to a list of strings.
Takes a numpy array of UTF-8 encoded bytes and decodes it back to strings,
removing any extra dimensions, and returns the result as a Python list.
Args:
str_ndarray (np.ndarray): Numpy array of UTF-8 encoded bytes, typically
with shape (N, 1) where N is the length of the resulting list
Returns:
List[str]: List of decoded strings
"""
str_ndarray = str_ndarray.astype("bytes")
str_ndarray = np.char.decode(str_ndarray, encoding="utf-8")
str_ndarray = str_ndarray.squeeze(axis=-1)
return str_ndarray.tolist()
[docs]
def ndarray2img(img_ndarray: np.ndarray) -> typing.List[Image.Image]:
"""Converts a numpy array of images to a list of PIL Image objects.
Takes a numpy array containing one or more images and converts each image
to a PIL Image object using Image.fromarray().
Args:
img_ndarray (np.ndarray): Numpy array of images, where each image is a 2D or 3D array
representing pixel values
Returns:
List[Image.Image]: List of PIL Image objects created from the input array
"""
if not HAVE_PIL:
raise UnavailableError(MISSING_PIL_MSG)
img_list = [Image.fromarray(i) for i in img_ndarray]
return img_list
[docs]
def cast_output(data, required_dtype):
"""Casts input data to a numpy array with the required dtype.
Takes input data that may be a torch.Tensor, numpy array, or other sequence type
and converts it to a numpy array with the specified dtype. For string dtypes,
the data is encoded as UTF-8 bytes. The output array is ensured to have at least
2 dimensions.
Args:
data: Input data to cast. Can be a torch.Tensor, numpy array, or sequence type
that can be converted to a numpy array.
required_dtype: The desired numpy dtype for the output array.
Returns:
np.ndarray: A numpy array containing the input data cast to the required dtype,
with at least 2 dimensions.
"""
if not HAVE_TRITON:
raise UnavailableError(MISSING_TRITON_MSG)
if isinstance(data, torch.Tensor):
data = data.cpu().numpy()
elif not isinstance(data, np.ndarray):
data = np.array(data)
data_is_str = required_dtype in (object, np.object_, bytes, np.bytes_)
if data_is_str:
data = np.char.encode(data, "utf-8")
if data.ndim < 2:
data = data[..., np.newaxis]
return data.astype(required_dtype)
[docs]
def broadcast_list(data, src=0, group=None):
"""Broadcasts a list of text data to all processes.
Args:
data (list): List of strings to broadcast.
src (int, optional): Source rank. Defaults to 0.
group (ProcessGroup, optional): The process group to work on. If None, the default process group will be used.
"""
if not torch.distributed.is_initialized():
raise RuntimeError("Distributed environment is not initialized.")
object_list = [data] if torch.distributed.get_rank() == src else [None]
torch.distributed.broadcast_object_list(object_list, src=src, group=group)
return object_list[0]