#
# SPDX-FileCopyrightText: Copyright (c) 1993-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import ctypes
import os
import sys
from polygraphy import func, mod, util
from polygraphy.datatype import DataType
from polygraphy.logger import G_LOGGER
np = mod.lazy_import("numpy")
def void_ptr(val=None):
return ctypes.c_void_p(val)
[docs]
@mod.export()
class MemcpyKind:
"""
Enumerates different kinds of copy operations.
"""
HostToHost = ctypes.c_int(0)
"""Copies from host memory to host memory"""
HostToDevice = ctypes.c_int(1)
"""Copies from host memory to device memory"""
DeviceToHost = ctypes.c_int(2)
"""Copies from device memory to host memory"""
DeviceToDevice = ctypes.c_int(3)
"""Copies from device memory to device memory"""
Default = ctypes.c_int(4)
[docs]
@mod.export()
class Cuda:
"""
NOTE: Do *not* construct this class manually.
Instead, use the ``wrapper()`` function to get the global wrapper.
Wrapper that exposes low-level CUDA functionality.
"""
def __init__(self):
self.handle = None
fallback_lib = None
if sys.platform.startswith("win"):
cuda_paths = [os.environ.get("CUDA_PATH", "")]
cuda_paths += os.environ.get("PATH", "").split(os.path.pathsep)
lib_pat = "cudart64_*.dll"
else:
cuda_paths = [
*os.environ.get("LD_LIBRARY_PATH", "").split(os.path.pathsep),
os.path.join("/", "usr", "local", "cuda", "lib64"),
os.path.join("/", "usr", "lib"),
os.path.join("/", "lib"),
]
lib_pat = "libcudart.so*"
fallback_lib = "libcudart.so"
cuda_paths = list(
filter(lambda x: x, cuda_paths)
) # Filter out empty paths (i.e. "")
candidates = util.find_in_dirs(lib_pat, cuda_paths)
if not candidates:
log_func = G_LOGGER.critical if fallback_lib is None else G_LOGGER.warning
log_func(
f"Could not find the CUDA runtime library.\nNote: Paths searched were:\n{cuda_paths}"
)
lib = fallback_lib
G_LOGGER.warning(f"Attempting to load: '{lib}' using default loader paths")
else:
G_LOGGER.verbose(f"Found candidate CUDA libraries: {candidates}")
lib = candidates[0]
self.handle = ctypes.CDLL(lib)
if not self.handle:
G_LOGGER.critical(
"Could not load the CUDA runtime library. Is it on your loader path?"
)
@func.constantmethod
def check(self, status):
if status != 0:
G_LOGGER.critical(
f"CUDA Error: {status}. To figure out what this means, refer to https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__TYPES.html#group__CUDART__TYPES_1g3f51e3575c2178246db0a94a430e0038"
)
@func.constantmethod
def create_stream(self):
# Signature: () -> int
ptr = void_ptr()
self.check(self.handle.cudaStreamCreate(ctypes.byref(ptr)))
return ptr.value
@func.constantmethod
def stream_synchronize(self, ptr):
# Signature: int -> None
self.check(self.handle.cudaStreamSynchronize(void_ptr(ptr)))
@func.constantmethod
def destroy_stream(self, ptr):
# Signature: int -> None
self.check(self.handle.cudaStreamDestroy(void_ptr(ptr)))
[docs]
@func.constantmethod
def malloc(self, nbytes):
"""
Allocates memory on the GPU.
Args:
nbytes (int): The number of bytes to allocate.
Returns:
int: The memory address of the allocated region, i.e. a device pointer.
Raises:
PolygraphyException: If an error was encountered during the allocation.
"""
ptr = void_ptr()
nbytes = ctypes.c_size_t(nbytes) # Required to prevent overflow
self.check(self.handle.cudaMalloc(ctypes.byref(ptr), nbytes))
return ptr.value
[docs]
@func.constantmethod
def free(self, ptr):
"""
Frees memory allocated on the GPU.
Args:
ptr (int): The memory address, i.e. a device pointer.
Raises:
PolygraphyException: If an error was encountered during the free.
"""
self.check(self.handle.cudaFree(void_ptr(ptr)))
[docs]
@func.constantmethod
def memcpy(self, dst, src, nbytes, kind, stream_ptr=None):
"""
Copies data between host and device memory.
Args:
dst (int):
The memory address of the destination, i.e. a pointer.
src (int):
The memory address of the source, i.e. a pointer.
nbytes (int):
The number of bytes to copy.
kind (MemcpyKind):
The kind of copy to perform.
stream_ptr (int):
The memory address of a CUDA stream, i.e. a pointer.
If this is not provided, a synchronous copy is performed.
Raises:
PolygraphyException: If an error was encountered during the copy.
"""
nbytes = ctypes.c_size_t(nbytes) # Required to prevent overflow
if stream_ptr is not None:
self.check(
self.handle.cudaMemcpyAsync(
void_ptr(dst), void_ptr(src), nbytes, kind, void_ptr(stream_ptr)
)
)
else:
self.check(
self.handle.cudaMemcpy(void_ptr(dst), void_ptr(src), nbytes, kind)
)
G_CUDA = None
[docs]
@mod.export()
def wrapper():
"""
Returns the global Polygraphy CUDA wrapper.
Returns:
Cuda: The global CUDA wrapper.
"""
global G_CUDA
if G_CUDA is None:
G_CUDA = Cuda()
return G_CUDA
[docs]
@mod.export()
class Stream:
"""
High-level wrapper for a CUDA stream.
"""
def __init__(self):
self.ptr = wrapper().create_stream()
"""int: The memory address of the underlying CUDA stream"""
def __enter__(self):
return self
[docs]
def __exit__(self, exc_type, exc_value, traceback):
"""
Frees the underlying CUDA stream.
"""
self.free()
[docs]
def free(self):
"""
Frees the underlying CUDA stream.
You can also use a context manager to manage the stream lifetime.
For example:
::
with Stream() as stream:
...
"""
wrapper().destroy_stream(self.ptr)
self.handle = ctypes.c_void_p(None)
[docs]
def synchronize(self):
"""
Synchronizes the stream.
"""
wrapper().stream_synchronize(self.ptr)
def try_get_stream_handle(stream):
if stream is None:
return None
return stream.ptr
[docs]
@mod.export()
class DeviceView:
"""
A read-only view of a GPU memory region.
"""
def __init__(self, ptr, shape, dtype):
"""
Args:
ptr (int): A pointer to the region of memory.
shape (Tuple[int]): The shape of the region.
dtype (DataType): The data type of the region.
"""
self.ptr = int(ptr)
"""int: The memory address of the underlying GPU memory"""
self.shape = shape
"""Tuple[int]: The shape of the device buffer"""
self.itemsize = None
self.dtype = dtype
"""DataType: The data type of the device buffer"""
def _check_host_buffer(self, host_buffer, copying_from):
if util.array.dtype(host_buffer) != self._dtype:
G_LOGGER.error(
f"Host buffer type: {util.array.dtype(host_buffer)} does not match the type of this device buffer: {self._dtype}. This may cause CUDA errors!"
)
if not util.array.is_contiguous(host_buffer):
G_LOGGER.critical(
"Provided host buffer is not contiguous in memory.\n"
"Hint: Use `util.make_contiguous()` or `np.ascontiguousarray()` to make the array contiguous in memory."
)
# If the host buffer is an input, the device buffer should be large enough to accomodate it.
# Otherwise, the host buffer needs to be large enough to accomodate the device buffer.
if copying_from:
if util.array.nbytes(host_buffer) > self.nbytes:
G_LOGGER.critical(
f"Provided host buffer is larger than device buffer.\n"
f"Note: host buffer is {util.array.nbytes(host_buffer)} bytes but device buffer is only {self.nbytes} bytes.\n"
f"Hint: Use `resize()` to resize the device buffer to the correct shape."
)
else:
if util.array.nbytes(host_buffer) < self.nbytes:
G_LOGGER.critical(
f"Provided host buffer is smaller than device buffer.\n"
f"Note: host buffer is only {util.array.nbytes(host_buffer)} bytes but device buffer is {self.nbytes} bytes.\n"
f"Hint: Use `util.array.resize_or_reallocate()` to resize the host buffer to the correct shape."
)
@property
def dtype(self):
try:
# For backwards compatibility
mod.warn_deprecated(
"Using NumPy data types in DeviceView/DeviceArray",
use_instead=None,
remove_in="0.50.0",
)
G_LOGGER.warning(
f"In the future, you will need to use `DataType.from_dtype(device_view.dtype).numpy()` to retrieve the NumPy data type"
)
return DataType.to_dtype(self._dtype, "numpy")
except:
return self._dtype
@dtype.setter
def dtype(self, new):
self._dtype = DataType.from_dtype(new)
self.itemsize = self._dtype.itemsize
@property
def nbytes(self):
"""
The number of bytes in the memory region.
"""
return util.volume(self.shape) * self.itemsize
[docs]
@func.constantmethod
def copy_to(self, host_buffer, stream=None):
"""
Copies from this device buffer to the provided host buffer.
Args:
host_buffer (Union[numpy.ndarray, torch.Tensor]):
The host buffer to copy into. The buffer must be contiguous in
memory (see np.ascontiguousarray or torch.Tensor.contiguous) and
large enough to accomodate the device buffer.
stream (Stream):
A Stream instance. Performs a synchronous copy if no stream is provided.
Returns:
np.ndarray: The host buffer
"""
if not self.nbytes:
return host_buffer
self._check_host_buffer(host_buffer, copying_from=False)
wrapper().memcpy(
dst=util.array.data_ptr(host_buffer),
src=self.ptr,
nbytes=self.nbytes,
kind=MemcpyKind.DeviceToHost,
stream_ptr=try_get_stream_handle(stream),
)
return host_buffer
[docs]
@func.constantmethod
def numpy(self):
"""
Create a new NumPy array containing the contents of this device buffer.
Returns:
np.ndarray: The newly created NumPy array.
"""
arr = np.empty(self.shape, dtype=DataType.to_dtype(self._dtype, "numpy"))
self.copy_to(arr)
return arr
def __str__(self):
return f"DeviceView[(dtype={self._dtype.name}, shape={self.shape}), ptr={hex(self.ptr)}]"
def __repr__(self):
return util.make_repr(
"DeviceView", ptr=self.ptr, shape=self.shape, dtype=self._dtype
)[0]
[docs]
@mod.export()
class DeviceArray(DeviceView):
"""
An array on the GPU.
"""
def __init__(self, shape=None, dtype=None):
"""
Args:
shape (Tuple[int]): The initial shape of the buffer.
dtype (DataType): The data type of the buffer.
"""
super().__init__(
ptr=0,
shape=util.default(shape, tuple()),
dtype=util.default(dtype, DataType.FLOAT32),
)
self.allocated_nbytes = 0
self.resize(self.shape)
def __enter__(self):
return self
[docs]
@staticmethod
def raw(shape=None):
"""
Creates an untyped device array of the specified shape.
Args:
shape (Tuple[int]):
The initial shape of the buffer, in units of bytes.
For example, a shape of ``(4, 4)`` would allocate a 16 byte array.
Returns:
DeviceArray: The raw device array.
"""
return DeviceArray(shape=shape, dtype=DataType.UINT8)
[docs]
def resize(self, shape):
"""
Resizes or reshapes the array to the specified shape.
If the allocated memory region is already large enough,
no reallocation is performed.
Args:
shape (Tuple[int]): The new shape.
Returns:
DeviceArray: self
"""
nbytes = util.volume(shape) * self.itemsize
if nbytes > self.allocated_nbytes:
self.free()
self.ptr = wrapper().malloc(nbytes)
self.allocated_nbytes = nbytes
self.shape = shape
return self
[docs]
def __exit__(self, exc_type, exc_value, traceback):
"""
Frees the underlying memory of this DeviceArray.
"""
self.free()
[docs]
def free(self):
"""
Frees the GPU memory associated with this array.
You can also use a context manager to ensure that memory is freed. For example:
::
with DeviceArray(...) as arr:
...
"""
wrapper().free(self.ptr)
self.shape = tuple()
self.allocated_nbytes = 0
self.ptr = 0
[docs]
def copy_from(self, host_buffer, stream=None):
"""
Copies from the provided host buffer into this device buffer.
Args:
host_buffer (Union[numpy.ndarray, torch.Tensor]):
The host buffer to copy from. The buffer must be contiguous in
memory (see np.ascontiguousarray or torch.Tensor.contiguous) and not
larger than this device buffer.
stream (Stream):
A Stream instance. Performs a synchronous copy if no stream is provided.
Returns:
DeviceArray: self
"""
if not util.array.nbytes(host_buffer):
return self
self._check_host_buffer(host_buffer, copying_from=True)
wrapper().memcpy(
dst=self.ptr,
src=util.array.data_ptr(host_buffer),
nbytes=util.array.nbytes(host_buffer),
kind=MemcpyKind.HostToDevice,
stream_ptr=try_get_stream_handle(stream),
)
return self
[docs]
def view(self, shape=None, dtype=None):
"""
Creates a read-only DeviceView from this DeviceArray.
Args:
shape (Sequence[int]):
The desired shape of the view.
Defaults to the shape of this array or view.
dtype (DataType):
The desired data type of the view.
Defaults to the data type of this array or view.
Returns:
DeviceView: A view of this arrays data on the device.
"""
shape = util.default(shape, self.shape)
dtype = util.default(dtype, self._dtype)
view = DeviceView(self.ptr, shape, dtype)
if view.nbytes > self.nbytes:
G_LOGGER.critical(
"A view cannot exceed the number of bytes of the original array.\n"
f"Note: Original array has shape: {self.shape} and dtype: {self._dtype}, which requires {self.nbytes} bytes, "
f"while the view has shape: {shape} and dtype: {dtype}, which requires {view.nbytes} bytes, "
)
return view
def __str__(self):
return f"DeviceArray[(dtype={self._dtype.name}, shape={self.shape}), ptr={hex(self.ptr)}]"
def __repr__(self):
return util.make_repr("DeviceArray", shape=self.shape, dtype=self._dtype)[0]