Source code for trtutils.core._bindings

# Copyright (c) 2024-2026 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
# mypy: disable-error-code="import-untyped"
from __future__ import annotations

import contextlib
from dataclasses import dataclass
from typing import TYPE_CHECKING

import numpy as np
import nvtx

from trtutils._flags import FLAGS
from trtutils._log import LOG
from trtutils.compat._libs import cudart, trt

from ._cuda import cuda_call
from ._memory import allocate_pinned_memory, cuda_malloc, get_ptr_pair

if TYPE_CHECKING:
    from typing_extensions import Self


[docs] @dataclass class Binding: """Small wrapper for a host/device allocation pair.""" index: int name: str dtype: np.dtype shape: list[int] is_input: bool allocation: int host_allocation: np.ndarray tensor_format: trt.TensorFormat pagelocked_mem: bool unified_mem: bool
[docs] def free(self: Self) -> None: """Free the memory of the binding.""" if self.pagelocked_mem: cuda_call(cudart.cudaFreeHost(self.host_allocation)) # unified pagelocked memory maps a host allocation to a device-visible alias pointer; # only the host allocation should be freed. if not self.unified_mem: cuda_call(cudart.cudaFree(self.allocation)) return cuda_call(cudart.cudaFree(self.allocation))
def __del__(self: Self) -> None: # potentially already had free called on it previously with contextlib.suppress(RuntimeError): self.free()
[docs] def create_binding( array: np.ndarray, bind_id: int = 0, name: str = "binding", tensor_format: trt.TensorFormat = trt.TensorFormat.LINEAR, *, use_array_data: bool | None = None, is_input: bool | None = None, pagelocked_mem: bool | None = None, unified_mem: bool | None = None, ) -> Binding: """ Create a binding for a TensorRT engine. Parameters ---------- array : np.ndarray The array to use for the binding. bind_id : int, optional The index of the binding. name : str, optional The name of the binding. tensor_format : trt.TensorFormat, optional The format of the tensor. use_array_data : bool, optional Whether to use the data from the array for the binding. By default None, which means the data will not be copied. is_input : bool, optional Whether the binding is an input or output. pagelocked_mem : bool, optional Whether or not to use pagelocked memory for host allocations. By default None, which means pagelocked memory will be used. unified_mem : bool, optional Whether or not the system has unified memory. If True, use cudaHostAllocMapped to take advantage of unified memory. Returns ------- Binding The binding for the host/device memory. """ pagelocked_mem = pagelocked_mem if pagelocked_mem is not None else True # info from the np.ndarray shape = array.shape dtype = array.dtype size = array.itemsize for s in shape: size *= s # allocate host and device memory if pagelocked_mem and unified_mem: host_allocation = allocate_pinned_memory(size, dtype, tuple(shape), unified_mem=unified_mem) _, device_allocation = get_ptr_pair(host_allocation) else: device_allocation = cuda_malloc(size) if pagelocked_mem: host_allocation = allocate_pinned_memory(size, dtype, tuple(shape)) else: host_allocation = np.zeros(tuple(shape), dtype=dtype) # copy the data from the host array to the host allocation if use_array_data: np.copyto(host_allocation, array) # make the binding return Binding( bind_id, name, dtype, list(shape), bool(is_input), device_allocation, host_allocation, tensor_format, pagelocked_mem, bool(unified_mem), )
[docs] def allocate_bindings( engine: trt.IEngine, context: trt.IExecutionContext, *, pagelocked_mem: bool | None = None, unified_mem: bool | None = None, ) -> tuple[list[Binding], list[Binding], list[int]]: """ Allocate memory for the input and output tensors of a TensorRT engine. Parameters ---------- engine : trt.IEngine The TensorRT engine to allocate memory for. context : trt.IExecutionContext The execution context to use. pagelocked_mem : bool, optional Whether or not to use pagelocked memory for host allocations. By default None, which means pagelocked memory will be used. unified_mem : bool, optional Whether or not the system has unified memory. If True, use cudaHostAllocMapped to take advantage of unified memory. By default None, which means the default host allocation will be used. Returns ------- tuple[list[Binding], list[Binding], list[int]] A tuple containing the input bindings, output bindings, and gpu memory pointers. Raises ------ RuntimeError If no optimization profiles are found. If the profile shape is not correct. ValueError If no input tensors are found. If no output tensors are found. If no memory allocations are found """ if FLAGS.NVTX_ENABLED: nvtx.push_range("core::allocate_bindings") pagelocked_mem = pagelocked_mem if pagelocked_mem is not None else True unified_mem = unified_mem if unified_mem is not None else False # lists for allocations inputs: list[Binding] = [] outputs: list[Binding] = [] allocations: list[int] = [] # magic numbers correct_profile_shape = 3 # version information to compare againist # >= 8.5 must use tensor API, otherwise binding # simplify by just checking hasattr num_tensors = range(engine.num_io_tensors) if FLAGS.TRT_10 else range(engine.num_bindings) # based on the version of tensorrt, num_io_tensors is not available in IEngine # first case: version 9 or higher OR version 8.5 and higher for i in num_tensors: if FLAGS.TRT_10: name = engine.get_tensor_name(i) is_input = False if engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT: is_input = True dtype = np.dtype(trt.nptype(engine.get_tensor_dtype(name))) shape = context.get_tensor_shape(name) data_format = engine.get_tensor_format(name) if is_input and shape[0] < 0: if not engine.num_optimization_profiles > 0: err_msg = "No optimization profiles found. Ensure that the engine has at least one optimization profile." if FLAGS.NVTX_ENABLED: nvtx.pop_range() raise RuntimeError(err_msg) profile_shape = engine.get_tensor_profile_shape(name, 0) # ensure that profile shape is min,opt,max if len(profile_shape) != correct_profile_shape: err_msg = f"Profile shape for tensor '{name}' has {len(profile_shape)} elements, expected {correct_profile_shape}" if FLAGS.NVTX_ENABLED: nvtx.pop_range() raise RuntimeError(err_msg) # Set the *max* profile as binding shape context.set_input_shape(name, profile_shape[2]) shape = context.get_tensor_shape(name) else: is_input = False if engine.binding_is_input(i): is_input = True name = engine.get_binding_name(i) dtype = np.dtype(trt.nptype(engine.get_binding_dtype(i))) shape = context.get_binding_shape(i) data_format = engine.get_binding_format(i) if is_input and shape[0] < 0: if not engine.num_optimization_profiles > 0: err_msg = "No optimization profiles found. Ensure that the engine has at least one optimization profile." if FLAGS.NVTX_ENABLED: nvtx.pop_range() raise RuntimeError(err_msg) profile_shape = engine.get_profile_shape(0, name) # ensure that profile shape is min,opt,max if len(profile_shape) != correct_profile_shape: err_msg = f"Profile shape for tensor '{name}' has {len(profile_shape)} elements, expected {correct_profile_shape}" if FLAGS.NVTX_ENABLED: nvtx.pop_range() raise RuntimeError(err_msg) # Set the *max* profile as binding shape context.set_binding_shape(i, profile_shape[2]) shape = context.get_binding_shape(i) LOG.debug(f"Allocating for I/O tensor: {name} - is_input: {is_input}") # allocate memory and create binding binding = create_binding( np.zeros(shape, dtype), bind_id=i, name=name, tensor_format=data_format, is_input=is_input, pagelocked_mem=pagelocked_mem, unified_mem=unified_mem, ) allocations.append(binding.allocation) if is_input: inputs.append(binding) else: outputs.append(binding) input_str = "Input" if is_input else "Output" log_msg = ( f"{input_str}-{i} '{binding.name}' with shape {binding.shape} and dtype {binding.dtype}" ) LOG.debug(log_msg) if len(inputs) == 0: err_msg = "No input tensors found. Ensure that the engine has at least one input tensor." if FLAGS.NVTX_ENABLED: nvtx.pop_range() raise ValueError(err_msg) if len(outputs) == 0: err_msg = "No output tensors found. Ensure that the engine has at least one output tensor." if FLAGS.NVTX_ENABLED: nvtx.pop_range() raise ValueError(err_msg) if len(allocations) == 0: err_msg = "No memory allocations found. Ensure that the engine has at least one input and output tensor." if FLAGS.NVTX_ENABLED: nvtx.pop_range() raise ValueError(err_msg) if FLAGS.NVTX_ENABLED: nvtx.pop_range() return inputs, outputs, allocations