Source code for trtutils.impls.yolo._preprocessors._trt

# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
# mypy: disable-error-code="import-untyped"
from __future__ import annotations

import contextlib
import math
from typing import TYPE_CHECKING

import numpy as np

from trtutils._engine import TRTEngine
from trtutils._log import LOG
from trtutils.core._bindings import create_binding
from trtutils.core._kernels import Kernel
from trtutils.core._memory import (
    memcpy_device_to_host_async,
    memcpy_host_to_device_async,
)
from trtutils.core._stream import create_stream, destroy_stream, stream_synchronize
from trtutils.impls.kernels import LETTERBOX_RESIZE, LINEAR_RESIZE
from trtutils.impls.onnx_models import build_yolo_preproc

if TYPE_CHECKING:
    from typing_extensions import Self

    with contextlib.suppress(ImportError):
        try:
            import cuda.bindings.runtime as cudart
        except (ImportError, ModuleNotFoundError):
            from cuda import cudart

_COLOR_CHANNELS = 3


[docs] class TRTPreprocessor: """TRT-based preprocessor for YOLO.""" def __init__( self: Self, output_shape: tuple[int, int], output_range: tuple[float, float], dtype: np.dtype, resize: str = "letterbox", stream: cudart.cudaStream_t | None = None, threads: tuple[int, int, int] | None = None, tag: str | None = None, *, pagelocked_mem: bool | None = None, unified_mem: bool | None = None, ) -> None: """ Create a TRTPreprocessor for YOLO. Parameters ---------- output_shape : tuple[int, int] The shape of the image YOLO expects. In the form [width, height] output_range : tuple[float, float] The range of the image values YOLO expects. Examples: (0.0, 1.0), (0.0, 255.0) dtype : np.dtype The datatype of the image. Examples: np.float32, np.float16, np.uint8 resize : str, optional The default resize method to use. By default, letterbox resizing will be used. Options are: ['letterbox', 'linear'] stream : cudart.cudaStream_t, optional The CUDA stream to use for preprocessing execution. If not provided, the preprocessor will use its own stream. threads : tuple[int, int, int], optional The number of threads to use per-block of computation. Can be changed depending on GPU size. tag : str The tag to prefix to all logging statements made. By default, 'TRTPreprocessor' If used within a YOLO class, will be the YOLO tag. pagelocked_mem : bool, optional Whether or not to allocate output memory as pagelocked. By default, pagelocked memory will be used. unified_mem : bool, optional Whether or not the system has unified memory. If True, use cudaHostAllocMapped to take advantage of unified memory. By default None, which means the default host allocation will be used. Raises ------ ValueError If the resize method is not valid """ self._tag = "TRTPreprocessor" if tag is None else f"{tag}.TRTPreprocessor" LOG.debug( f"{self._tag}: Creating preprocessor: {output_shape}, {output_range}, {dtype}", ) # allocate static output sizes self._o_shape = output_shape self._o_range = output_range self._o_dtype = dtype self._pagelocked_mem = pagelocked_mem if pagelocked_mem is not None else True self._unified_mem = unified_mem # compute scale and offset self._scale: float = (self._o_range[1] - self._o_range[0]) / 255.0 self._offset: float = self._o_range[0] # resize methods self._valid_methods = ["letterbox", "linear"] if resize not in self._valid_methods: err_msg = f"{self._tag}: Unknown method for image resizing. Options are {self._valid_methods}" raise ValueError(err_msg) self._resize = resize # handle stream self._stream: cudart.cudaStream_t self._own_stream = False if stream is not None: self._stream = stream else: self._stream = create_stream() self._own_stream = True # allocate input, output binding # need input -> intermediate -> output # for now just allocate 1080p image, reallocate when needed # resize kernel input binding self._allocated_input_shape: tuple[int, int, int] = (1080, 1920, 3) dummy_input: np.ndarray = np.zeros( self._allocated_input_shape, dtype=np.uint8, ) self._input_binding = create_binding( dummy_input, pagelocked_mem=self._pagelocked_mem, unified_mem=self._unified_mem, ) dummy_intermediate: np.ndarray = np.zeros( (self._o_shape[1], self._o_shape[0], 3), dtype=np.uint8, ) self._intermediate_binding = create_binding( dummy_intermediate, pagelocked_mem=self._pagelocked_mem, unified_mem=self._unified_mem, ) # block and thread info self._num_threads: tuple[int, int, int] = threads or (32, 32, 1) self._num_blocks: tuple[int, int, int] = ( math.ceil(self._o_shape[0] / self._num_threads[0]), math.ceil(self._o_shape[1] / self._num_threads[1]), 1, ) # either letterbox or linear is used self._linear_kernel = Kernel(*LINEAR_RESIZE) self._letterbox_kernel = Kernel(*LETTERBOX_RESIZE) # allocate the scale/offset CUDA locations scale_arr: np.ndarray = np.array((self._scale,), dtype=np.float32) self._scale_binding = create_binding(scale_arr) memcpy_host_to_device_async( self._scale_binding.allocation, scale_arr, self._stream, ) offset_arr: np.ndarray = np.array((self._offset,), dtype=np.float32) self._offset_binding = create_binding(offset_arr) memcpy_host_to_device_async( self._offset_binding.allocation, offset_arr, self._stream, ) stream_synchronize(self._stream) # allocate the trtengine self._engine_path = build_yolo_preproc(self._o_shape, self._o_dtype) self._engine = TRTEngine( self._engine_path, stream=self._stream, warmup_iterations=1, warmup=True, pagelocked_mem=self._pagelocked_mem, unified_mem=self._unified_mem, ) self._engine_output_binding = self._engine.output_bindings[0] # pre-allocate the input pointer list for the engine self._gpu_pointers = [ self._intermediate_binding.allocation, self._scale_binding.allocation, self._offset_binding.allocation, ] def __del__(self: Self) -> None: with contextlib.suppress(AttributeError, RuntimeError): if self._own_stream: destroy_stream(self._stream) with contextlib.suppress(AttributeError): del self._input_binding with contextlib.suppress(AttributeError): del self._intermediate_binding with contextlib.suppress(AttributeError): del self._scale_binding with contextlib.suppress(AttributeError): del self._offset_binding with contextlib.suppress(AttributeError): del self._engine def _create_args( self: Self, height: int, width: int, method: str, *, verbose: bool | None = None, ) -> tuple[ Kernel, np.ndarray, tuple[float, float], tuple[float, float], ]: if verbose: LOG.debug(f"{self._tag}: create_args") # pre-compute the common potions o_width, o_height = self._o_shape scale_x = o_width / width scale_y = o_height / height if method == "letterbox": if verbose: LOG.debug(f"{self._tag}: Making letterbox args") scale = min(scale_x, scale_y) new_width = int(width * scale) new_height = int(height * scale) padding_x = int((o_width - new_width) / 2) padding_y = int((o_height - new_height) / 2) ratios = (scale, scale) padding = (padding_x, padding_y) # create args and assign kernel resize_kernel = self._letterbox_kernel resize_args = resize_kernel.create_args( self._input_binding.allocation, self._intermediate_binding.allocation, width, height, o_width, o_height, padding_x, padding_y, new_width, new_height, verbose=verbose, ) else: if verbose: LOG.debug(f"{self._tag}: Making linear args") o_width, o_height = self._o_shape scale_x = o_width / width scale_y = o_height / height ratios = (scale_x, scale_y) padding = (0, 0) # create args and assign kernel resize_kernel = self._linear_kernel resize_args = resize_kernel.create_args( self._input_binding.allocation, self._intermediate_binding.allocation, width, height, o_width, o_height, verbose=verbose, ) return resize_kernel, resize_args, ratios, padding def _reallocate_input( self: Self, image: np.ndarray, *, verbose: bool | None = None, ) -> None: if verbose: LOG.debug(f"{self._tag}: Reallocating input bindings") LOG.debug( f"{self._tag}: Reallocation -> new shape: {image.shape}, old shape: {self._input_binding.shape}", ) self._allocated_input_shape = image.shape # type: ignore[assignment] self._input_binding = create_binding( image, is_input=True, pagelocked_mem=self._pagelocked_mem, unified_mem=self._unified_mem, ) def _validate_input( self: Self, image: np.ndarray, resize: str | None = None, *, verbose: bool | None = None, ) -> str: if verbose: LOG.debug(f"{self._tag}: validate_input") # valid the method resize = resize if resize is not None else self._resize if resize not in self._valid_methods: err_msg = f"{self._tag}: Unknown method for image resizing. Options are {self._valid_methods}" raise ValueError(err_msg) img_shape: tuple[int, int, int] = image.shape # type: ignore[assignment] if verbose: LOG.debug( f"{self._tag}: Image shape: {img_shape}, Allocated shape: {self._allocated_input_shape}", ) # check if the image shape is the same as re have allocated with, if not update if img_shape != self._allocated_input_shape: if img_shape[2] != _COLOR_CHANNELS: err_msg = f"{self._tag}: Can only preprocess color images." raise ValueError(err_msg) self._reallocate_input(image, verbose=verbose) return resize
[docs] def warmup(self: Self) -> None: """ Warmup the CUDA preprocessor. Allocates all CUDA memory and enables future passes to be significantly faster. """ rand_data: np.ndarray = np.random.default_rng().integers( 0, 255, (*self._o_shape, 3), dtype=np.uint8, ) self.preprocess(rand_data, resize=self._resize, no_copy=True)
def __call__( self: Self, image: np.ndarray, resize: str | None = None, *, no_copy: bool | None = None, verbose: bool | None = None, ) -> tuple[np.ndarray, tuple[float, float], tuple[float, float]]: """ Preprocess an image for YOLO. Parameters ---------- image : np.ndarray The image to preprocess. resize : str, optional The method to resize the image with. Options are [letterbox, linear], will use method provided in constructor by default. no_copy : bool, optional If True, the outputs will not be copied out from the cuda allocated host memory. Instead, the host memory will be returned directly. This memory WILL BE OVERWRITTEN INPLACE by future preprocessing calls. verbose : bool, optional Whether or not to output additional information to stdout. If not provided, will default to overall engines verbose setting. Returns ------- tuple[np.ndarray, tuple[float, float], tuple[float, float]] The preprocessed image, ratios, and padding used for resizing. """ return self.preprocess(image, resize=resize, no_copy=no_copy, verbose=verbose)
[docs] def preprocess( self: Self, image: np.ndarray, resize: str | None = None, *, no_copy: bool | None = None, verbose: bool | None = None, ) -> tuple[np.ndarray, tuple[float, float], tuple[float, float]]: """ Preprocess an image for YOLO. Parameters ---------- image : np.ndarray The image to preprocess. resize : str, optional The method to resize the image with. Options are [letterbox, linear], will use method provided in constructor by default. no_copy : bool, optional If True, the outputs will not be copied out from the cuda allocated host memory. Instead, the host memory will be returned directly. This memory WILL BE OVERWRITTEN INPLACE by future preprocessing calls. verbose : bool, optional Whether or not to output additional information to stdout. If not provided, will default to overall engines verbose setting. Returns ------- tuple[np.ndarray, tuple[float, float], tuple[float, float]] The preprocessed image, ratios, and padding used for resizing. """ _, ratios, padding = self.direct_preproc( image, resize=resize, no_warn=True, verbose=verbose, ) if not self._unified_mem: memcpy_device_to_host_async( self._engine_output_binding.host_allocation, self._engine_output_binding.allocation, self._stream, ) stream_synchronize(self._stream) if no_copy: return self._engine_output_binding.host_allocation, ratios, padding return self._engine_output_binding.host_allocation.copy(), ratios, padding
[docs] def direct_preproc( self: Self, image: np.ndarray, resize: str | None = None, *, no_warn: bool | None = None, verbose: bool | None = None, ) -> tuple[int, tuple[float, float], tuple[float, float]]: """ Preprocess an image for YOLO. Parameters ---------- image : np.ndarray The image to preprocess. resize : str The method to resize the image with. By default letterbox, options are [letterbox, linear] no_warn : bool, optional If True, do not warn about usage. verbose : bool, optional Whether or not to output additional information to stdout. If not provided, will default to overall engines verbose setting. Returns ------- tuple[int, tuple[float, float], tuple[float, float]] The GPU pointer to preprocessed data, ratios, and padding used for resizing. """ if verbose: LOG.debug(f"{self._tag}: direct_preproc") if not no_warn: LOG.warning( "Calling direct_preproc is potentially dangerous. Outputs can be overwritten inplace!", ) # valid the method resize = self._validate_input(image, resize, verbose=verbose) # create the arguments height, width = image.shape[:2] resize_kernel, resize_args, ratios, padding = self._create_args( height, width, resize, verbose=verbose, ) if verbose: LOG.debug(f"Ratios: {ratios}") LOG.debug(f"Padding: {padding}") if self._pagelocked_mem and self._unified_mem: np.copyto(self._input_binding.host_allocation, image) else: memcpy_host_to_device_async( self._input_binding.allocation, image, self._stream, ) resize_kernel.call( self._num_blocks, self._num_threads, self._stream, resize_args, ) output_ptrs = self._engine.raw_exec(self._gpu_pointers, no_warn=True) return output_ptrs[0], ratios, padding