Source code for trtutils.core._memory

# Copyright (c) 2024-2026 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
# mypy: disable-error-code="import-untyped"
from __future__ import annotations

import contextlib
import ctypes

import numpy as np
import nvtx

with contextlib.suppress(Exception):
    from trtutils.compat._libs import cudart

from trtutils._flags import FLAGS
from trtutils._log import LOG

from ._cuda import cuda_call


[docs] def memcpy_host_to_device(device_ptr: int, host_arr: np.ndarray) -> None: """ Copy a numpy array to a device pointer with error checking. Parameters ---------- device_ptr : int The device pointer to copy to. host_arr : np.ndarray The numpy array to copy. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::memcpy_host_to_device") nbytes = host_arr.size * host_arr.itemsize # LOG.debug(f"MemcpyHtoD: {device_ptr} with size: {nbytes}") cuda_call( cudart.cudaMemcpy( device_ptr, host_arr.ctypes.data, nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, ), ) if FLAGS.NVTX_ENABLED: nvtx.pop_range()
[docs] def memcpy_device_to_host(host_arr: np.ndarray, device_ptr: int) -> None: """ Copy a device pointer to a numpy array with error checking. Parameters ---------- host_arr : np.ndarray The numpy array to copy to. device_ptr : int The device pointer to copy. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::memcpy_device_to_host") nbytes = host_arr.size * host_arr.itemsize # LOG.debug(f"MemcpyDtoH: {device_ptr} with size: {nbytes}") cuda_call( cudart.cudaMemcpy( host_arr.ctypes.data, device_ptr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost, ), ) if FLAGS.NVTX_ENABLED: nvtx.pop_range()
[docs] def memcpy_host_to_device_async( device_ptr: int, host_arr: np.ndarray, stream: cudart.cudaStream_t, ) -> None: """ Copy a numpy array to a device pointer with error checking. Parameters ---------- device_ptr : int The device pointer to copy to. host_arr : np.ndarray The numpy array to copy. stream : cudart.cudaStream_t The stream to utilize. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::memcpy_host_to_device_async") nbytes = host_arr.size * host_arr.itemsize # LOG.debug(f"MemcpyHtoD_Async: {device_ptr} with size: {nbytes}") cuda_call( cudart.cudaMemcpyAsync( device_ptr, host_arr.ctypes.data, nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, stream, ), ) if FLAGS.NVTX_ENABLED: nvtx.pop_range()
[docs] def memcpy_device_to_host_async( host_arr: np.ndarray, device_ptr: int, stream: cudart.cudaStream_t, ) -> None: """ Copy a device pointer to a numpy array with error checking. Parameters ---------- host_arr : np.ndarray The numpy array to copy to. device_ptr : int The device pointer to copy. stream : cudart.cudaStream_t The stream to utilize. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::memcpy_device_to_host_async") nbytes = host_arr.size * host_arr.itemsize # LOG.debug(f"MemcpyDtoH_Async: {device_ptr} with size: {nbytes}") cuda_call( cudart.cudaMemcpyAsync( host_arr.ctypes.data, device_ptr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost, stream, ), ) if FLAGS.NVTX_ENABLED: nvtx.pop_range()
[docs] def memcpy_device_to_device( dst_ptr: int, src_ptr: int, nbytes: int, ) -> None: """ Copy from one device pointer to another with error checking. Parameters ---------- dst_ptr : int The destination device pointer. src_ptr : int The source device pointer. nbytes : int The number of bytes to copy. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::memcpy_device_to_device") cuda_call( cudart.cudaMemcpy( dst_ptr, src_ptr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyDeviceToDevice, ), ) if FLAGS.NVTX_ENABLED: nvtx.pop_range()
[docs] def memcpy_device_to_device_async( dst_ptr: int, src_ptr: int, nbytes: int, stream: cudart.cudaStream_t, ) -> None: """ Copy from one device pointer to another asynchronously. Parameters ---------- dst_ptr : int The destination device pointer. src_ptr : int The source device pointer. nbytes : int The number of bytes to copy. stream : cudart.cudaStream_t The stream to utilize. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::memcpy_device_to_device_async") cuda_call( cudart.cudaMemcpyAsync( dst_ptr, src_ptr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyDeviceToDevice, stream, ), ) if FLAGS.NVTX_ENABLED: nvtx.pop_range()
[docs] def memcpy_host_to_device_offset( device_ptr: int, host_arr: np.ndarray, offset_bytes: int, ) -> None: """ Copy a numpy array to a device pointer at a specific offset. Parameters ---------- device_ptr : int The base device pointer. host_arr : np.ndarray The numpy array to copy. offset_bytes : int The byte offset into the device buffer. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::memcpy_host_to_device_offset") nbytes = host_arr.size * host_arr.itemsize cuda_call( cudart.cudaMemcpy( device_ptr + offset_bytes, host_arr.ctypes.data, nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, ), ) if FLAGS.NVTX_ENABLED: nvtx.pop_range()
[docs] def memcpy_host_to_device_offset_async( device_ptr: int, host_arr: np.ndarray, offset_bytes: int, stream: cudart.cudaStream_t, ) -> None: """ Copy a numpy array to a device pointer at a specific offset asynchronously. Parameters ---------- device_ptr : int The base device pointer. host_arr : np.ndarray The numpy array to copy. offset_bytes : int The byte offset into the device buffer. stream : cudart.cudaStream_t The stream to utilize. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::memcpy_host_to_device_offset_async") nbytes = host_arr.size * host_arr.itemsize cuda_call( cudart.cudaMemcpyAsync( device_ptr + offset_bytes, host_arr.ctypes.data, nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, stream, ), ) if FLAGS.NVTX_ENABLED: nvtx.pop_range()
[docs] def cuda_malloc( nbytes: int, ) -> int: """ Perform a memory allocation using cudart.cudaMalloc. Parameters ---------- nbytes : int The number of bytes to allocate. Returns ------- int The pointer to the allocated memory. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::cuda_malloc") device_ptr: int = cuda_call(cudart.cudaMalloc(nbytes)) LOG.debug(f"Allocated, device_ptr: {device_ptr}, size: {nbytes}") if FLAGS.NVTX_ENABLED: nvtx.pop_range() return device_ptr
[docs] def allocate_pinned_memory( nbytes: int, dtype: np.dtype, shape: tuple[int, ...] | None = None, *, unified_mem: bool | None = None, ) -> np.ndarray: """ Allocate pinned (page-locked) memory on the host, required for asynchronous memory transfers. The shape of the pagelocked memory is a 1D numpy array, so CPU side reshaping is required for some applications. If shape is passed, then the shape will not be 1D, but memory transfer may have complications. Parameters ---------- nbytes : int The number of bytes to allocate. dtype : np.dtype The data type for the allocated memory. shape : tuple[int, ...], optional An optional shape for the pagelocked memory array. If not provided, the array will be 1D. unified_mem : bool, optional If True, use cudaHostAllocMapped to take advantage of unified memory. Returns ------- np.ndarray A numpy array backed by pinned memory. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::allocate_pinned_memory") flags = cudart.cudaHostAllocMapped if unified_mem else cudart.cudaHostAllocDefault # allocate pinned memory and get a pointer to it directly host_ptr = cuda_call(cudart.cudaHostAlloc(nbytes, flags)) # create the numpy array array_type = ctypes.c_byte * nbytes array: np.ndarray = np.ctypeslib.as_array(array_type.from_address(host_ptr)) # set datatype and shape array = array.view(dtype) shape = (nbytes // dtype.itemsize,) if shape is None else shape LOG.debug( f"Allocated-pagelocked, host_ptr: {host_ptr}, size: {nbytes}, shape: {shape}", ) if FLAGS.NVTX_ENABLED: nvtx.pop_range() return array.reshape(shape)
def get_ptr_pair(host_array: np.ndarray) -> tuple[int, int]: """ Get the pointer pairs (host/device) of a pagelocked allocation. Parameters ---------- host_array : np.ndarray A np.ndarray allocated by the allocate_pinned_memory function. Returns ------- tuple[int, int] The host and device pointer of the allocation. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::get_ptr_pair") host_ptr = host_array.ctypes.data device_ptr = cuda_call(cudart.cudaHostGetDevicePointer(host_ptr, 0)) LOG.debug(f"Acquired pointers: (host: {host_ptr}, device: {device_ptr}) from ndarray") if FLAGS.NVTX_ENABLED: nvtx.pop_range() return host_ptr, device_ptr
[docs] def allocate_managed_memory( nbytes: int, stream: cudart.cudaStream_t | None = None, ) -> int: """ Allocate managed memory. Parameters ---------- nbytes : int The number of bytes to allocate. stream : cudart.cudaStream_t, optional The stream to utilize. Returns ------- int The pointer to the allocated memory. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::allocate_managed_memory") device_ptr: int = cuda_call(cudart.cudaMallocManaged(nbytes, cudart.cudaMemAttachGlobal)) # if a stream is provided, we should attach the memory if stream is not None: cuda_call(cudart.cudaStreamAttachMemAsync(stream, device_ptr, 0, cudart.cudaMemAttachGlobal)) LOG.debug(f"Allocated-managed, device_ptr: {device_ptr}, size: {nbytes}") if FLAGS.NVTX_ENABLED: nvtx.pop_range() return device_ptr
[docs] def cuda_free(device_ptr: int) -> None: """ Free a CUDA device pointer. Parameters ---------- device_ptr : int The device pointer to free. """ cuda_call(cudart.cudaFree(device_ptr))
[docs] def cuda_host_free(host_ptr: int | np.ndarray) -> None: """ Free a CUDA host pointer. Parameters ---------- host_ptr : int The host pointer to free. """ if isinstance(host_ptr, np.ndarray): host_ptr = host_ptr.ctypes.data cuda_call(cudart.cudaFreeHost(host_ptr))
[docs] def free_device_ptrs(ptrs: list[int]) -> None: """ Free a list of CUDA device pointers. Parameters ---------- ptrs : list[int] The device pointers to free. """ for p in ptrs: cuda_free(p)
[docs] def allocate_to_device( data: list[np.ndarray], ) -> list[int]: """ Allocate device memory for each numpy array and copy the data over. Parameters ---------- data : list[np.ndarray] The numpy arrays to copy. Returns ------- list[int] The device pointers to the allocated memory. """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::allocate_to_device") device_ptrs: list[int] = [] for arr in data: ptr = cuda_malloc(arr.nbytes) memcpy_host_to_device(ptr, arr) device_ptrs.append(ptr) if FLAGS.NVTX_ENABLED: nvtx.pop_range() return device_ptrs