# Copyright (c) 2024-2026 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
# mypy: disable-error-code="import-untyped"
from __future__ import annotations
import contextlib
import ctypes
import numpy as np
import nvtx
with contextlib.suppress(Exception):
from trtutils.compat._libs import cudart
from trtutils._flags import FLAGS
from trtutils._log import LOG
from ._cuda import cuda_call
[docs]
def memcpy_host_to_device(device_ptr: int, host_arr: np.ndarray) -> None:
"""
Copy a numpy array to a device pointer with error checking.
Parameters
----------
device_ptr : int
The device pointer to copy to.
host_arr : np.ndarray
The numpy array to copy.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::memcpy_host_to_device")
nbytes = host_arr.size * host_arr.itemsize
# LOG.debug(f"MemcpyHtoD: {device_ptr} with size: {nbytes}")
cuda_call(
cudart.cudaMemcpy(
device_ptr,
host_arr.ctypes.data,
nbytes,
cudart.cudaMemcpyKind.cudaMemcpyHostToDevice,
),
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
[docs]
def memcpy_device_to_host(host_arr: np.ndarray, device_ptr: int) -> None:
"""
Copy a device pointer to a numpy array with error checking.
Parameters
----------
host_arr : np.ndarray
The numpy array to copy to.
device_ptr : int
The device pointer to copy.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::memcpy_device_to_host")
nbytes = host_arr.size * host_arr.itemsize
# LOG.debug(f"MemcpyDtoH: {device_ptr} with size: {nbytes}")
cuda_call(
cudart.cudaMemcpy(
host_arr.ctypes.data,
device_ptr,
nbytes,
cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost,
),
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
[docs]
def memcpy_host_to_device_async(
device_ptr: int,
host_arr: np.ndarray,
stream: cudart.cudaStream_t,
) -> None:
"""
Copy a numpy array to a device pointer with error checking.
Parameters
----------
device_ptr : int
The device pointer to copy to.
host_arr : np.ndarray
The numpy array to copy.
stream : cudart.cudaStream_t
The stream to utilize.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::memcpy_host_to_device_async")
nbytes = host_arr.size * host_arr.itemsize
# LOG.debug(f"MemcpyHtoD_Async: {device_ptr} with size: {nbytes}")
cuda_call(
cudart.cudaMemcpyAsync(
device_ptr,
host_arr.ctypes.data,
nbytes,
cudart.cudaMemcpyKind.cudaMemcpyHostToDevice,
stream,
),
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
[docs]
def memcpy_device_to_host_async(
host_arr: np.ndarray,
device_ptr: int,
stream: cudart.cudaStream_t,
) -> None:
"""
Copy a device pointer to a numpy array with error checking.
Parameters
----------
host_arr : np.ndarray
The numpy array to copy to.
device_ptr : int
The device pointer to copy.
stream : cudart.cudaStream_t
The stream to utilize.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::memcpy_device_to_host_async")
nbytes = host_arr.size * host_arr.itemsize
# LOG.debug(f"MemcpyDtoH_Async: {device_ptr} with size: {nbytes}")
cuda_call(
cudart.cudaMemcpyAsync(
host_arr.ctypes.data,
device_ptr,
nbytes,
cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost,
stream,
),
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
[docs]
def memcpy_device_to_device(
dst_ptr: int,
src_ptr: int,
nbytes: int,
) -> None:
"""
Copy from one device pointer to another with error checking.
Parameters
----------
dst_ptr : int
The destination device pointer.
src_ptr : int
The source device pointer.
nbytes : int
The number of bytes to copy.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::memcpy_device_to_device")
cuda_call(
cudart.cudaMemcpy(
dst_ptr,
src_ptr,
nbytes,
cudart.cudaMemcpyKind.cudaMemcpyDeviceToDevice,
),
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
[docs]
def memcpy_device_to_device_async(
dst_ptr: int,
src_ptr: int,
nbytes: int,
stream: cudart.cudaStream_t,
) -> None:
"""
Copy from one device pointer to another asynchronously.
Parameters
----------
dst_ptr : int
The destination device pointer.
src_ptr : int
The source device pointer.
nbytes : int
The number of bytes to copy.
stream : cudart.cudaStream_t
The stream to utilize.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::memcpy_device_to_device_async")
cuda_call(
cudart.cudaMemcpyAsync(
dst_ptr,
src_ptr,
nbytes,
cudart.cudaMemcpyKind.cudaMemcpyDeviceToDevice,
stream,
),
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
[docs]
def memcpy_host_to_device_offset(
device_ptr: int,
host_arr: np.ndarray,
offset_bytes: int,
) -> None:
"""
Copy a numpy array to a device pointer at a specific offset.
Parameters
----------
device_ptr : int
The base device pointer.
host_arr : np.ndarray
The numpy array to copy.
offset_bytes : int
The byte offset into the device buffer.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::memcpy_host_to_device_offset")
nbytes = host_arr.size * host_arr.itemsize
cuda_call(
cudart.cudaMemcpy(
device_ptr + offset_bytes,
host_arr.ctypes.data,
nbytes,
cudart.cudaMemcpyKind.cudaMemcpyHostToDevice,
),
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
[docs]
def memcpy_host_to_device_offset_async(
device_ptr: int,
host_arr: np.ndarray,
offset_bytes: int,
stream: cudart.cudaStream_t,
) -> None:
"""
Copy a numpy array to a device pointer at a specific offset asynchronously.
Parameters
----------
device_ptr : int
The base device pointer.
host_arr : np.ndarray
The numpy array to copy.
offset_bytes : int
The byte offset into the device buffer.
stream : cudart.cudaStream_t
The stream to utilize.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::memcpy_host_to_device_offset_async")
nbytes = host_arr.size * host_arr.itemsize
cuda_call(
cudart.cudaMemcpyAsync(
device_ptr + offset_bytes,
host_arr.ctypes.data,
nbytes,
cudart.cudaMemcpyKind.cudaMemcpyHostToDevice,
stream,
),
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
[docs]
def cuda_malloc(
nbytes: int,
) -> int:
"""
Perform a memory allocation using cudart.cudaMalloc.
Parameters
----------
nbytes : int
The number of bytes to allocate.
Returns
-------
int
The pointer to the allocated memory.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::cuda_malloc")
device_ptr: int = cuda_call(cudart.cudaMalloc(nbytes))
LOG.debug(f"Allocated, device_ptr: {device_ptr}, size: {nbytes}")
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
return device_ptr
[docs]
def allocate_pinned_memory(
nbytes: int,
dtype: np.dtype,
shape: tuple[int, ...] | None = None,
*,
unified_mem: bool | None = None,
) -> np.ndarray:
"""
Allocate pinned (page-locked) memory on the host, required for asynchronous memory transfers.
The shape of the pagelocked memory is a 1D numpy array, so CPU side reshaping
is required for some applications. If shape is passed, then the shape will not
be 1D, but memory transfer may have complications.
Parameters
----------
nbytes : int
The number of bytes to allocate.
dtype : np.dtype
The data type for the allocated memory.
shape : tuple[int, ...], optional
An optional shape for the pagelocked memory array.
If not provided, the array will be 1D.
unified_mem : bool, optional
If True, use cudaHostAllocMapped to take advantage of unified memory.
Returns
-------
np.ndarray
A numpy array backed by pinned memory.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::allocate_pinned_memory")
flags = cudart.cudaHostAllocMapped if unified_mem else cudart.cudaHostAllocDefault
# allocate pinned memory and get a pointer to it directly
host_ptr = cuda_call(cudart.cudaHostAlloc(nbytes, flags))
# create the numpy array
array_type = ctypes.c_byte * nbytes
array: np.ndarray = np.ctypeslib.as_array(array_type.from_address(host_ptr))
# set datatype and shape
array = array.view(dtype)
shape = (nbytes // dtype.itemsize,) if shape is None else shape
LOG.debug(
f"Allocated-pagelocked, host_ptr: {host_ptr}, size: {nbytes}, shape: {shape}",
)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
return array.reshape(shape)
def get_ptr_pair(host_array: np.ndarray) -> tuple[int, int]:
"""
Get the pointer pairs (host/device) of a pagelocked allocation.
Parameters
----------
host_array : np.ndarray
A np.ndarray allocated by the allocate_pinned_memory function.
Returns
-------
tuple[int, int]
The host and device pointer of the allocation.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::get_ptr_pair")
host_ptr = host_array.ctypes.data
device_ptr = cuda_call(cudart.cudaHostGetDevicePointer(host_ptr, 0))
LOG.debug(f"Acquired pointers: (host: {host_ptr}, device: {device_ptr}) from ndarray")
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
return host_ptr, device_ptr
[docs]
def allocate_managed_memory(
nbytes: int,
stream: cudart.cudaStream_t | None = None,
) -> int:
"""
Allocate managed memory.
Parameters
----------
nbytes : int
The number of bytes to allocate.
stream : cudart.cudaStream_t, optional
The stream to utilize.
Returns
-------
int
The pointer to the allocated memory.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::allocate_managed_memory")
device_ptr: int = cuda_call(cudart.cudaMallocManaged(nbytes, cudart.cudaMemAttachGlobal))
# if a stream is provided, we should attach the memory
if stream is not None:
cuda_call(cudart.cudaStreamAttachMemAsync(stream, device_ptr, 0, cudart.cudaMemAttachGlobal))
LOG.debug(f"Allocated-managed, device_ptr: {device_ptr}, size: {nbytes}")
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
return device_ptr
[docs]
def cuda_free(device_ptr: int) -> None:
"""
Free a CUDA device pointer.
Parameters
----------
device_ptr : int
The device pointer to free.
"""
cuda_call(cudart.cudaFree(device_ptr))
[docs]
def cuda_host_free(host_ptr: int | np.ndarray) -> None:
"""
Free a CUDA host pointer.
Parameters
----------
host_ptr : int
The host pointer to free.
"""
if isinstance(host_ptr, np.ndarray):
host_ptr = host_ptr.ctypes.data
cuda_call(cudart.cudaFreeHost(host_ptr))
[docs]
def free_device_ptrs(ptrs: list[int]) -> None:
"""
Free a list of CUDA device pointers.
Parameters
----------
ptrs : list[int]
The device pointers to free.
"""
for p in ptrs:
cuda_free(p)
[docs]
def allocate_to_device(
data: list[np.ndarray],
) -> list[int]:
"""
Allocate device memory for each numpy array and copy the data over.
Parameters
----------
data : list[np.ndarray]
The numpy arrays to copy.
Returns
-------
list[int]
The device pointers to the allocated memory.
"""
if FLAGS.NVTX_ENABLED:
nvtx.push_range("core::allocate_to_device")
device_ptrs: list[int] = []
for arr in data:
ptr = cuda_malloc(arr.nbytes)
memcpy_host_to_device(ptr, arr)
device_ptrs.append(ptr)
if FLAGS.NVTX_ENABLED:
nvtx.pop_range()
return device_ptrs