Source code for trtutils.impls.common

# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
"""
Common implementations for TensorRT engines.

Functions
---------
:func:`decode_efficient_nms`
    Processes the output of a model with EfficientNMS plugin outputs.
:func:`postprocess_efficient_nms`
    Postprocesses the output of a model to reshape and scale based on preprocessing.
"""

from __future__ import annotations

import numpy as np
from cv2ext.bboxes import nms

from trtutils._jit import register_jit
from trtutils._log import LOG


[docs] def postprocess_efficient_nms( outputs: list[np.ndarray], ratios: tuple[float, float] = (1.0, 1.0), padding: tuple[float, float] = (0.0, 0.0), conf_thres: float | None = None, *, no_copy: bool | None = None, verbose: bool | None = None, ) -> list[np.ndarray]: """ Postprocess the output of the EfficientNMS plugin. Must be used before passing outputs to decode_efficient_nms since this will reshape the outputs. Parameters ---------- outputs : list[np.ndarray] The outputs from the TRTEngine using EfficientNMS output. ratios : tuple[float, float] The ratios used during preprocessing to resize the input. padding : tuple[float, float] The padding used during preprocessing to position the input. conf_thres : float, optional Optional confidence threshold to further filter detections by. Detections are already filtered by EfficientNMS parameters ahead of time. Should be used if EfficientNMS was given low-confidence and want to filter higher variably. no_copy : bool, optional If True, the outputs will not be copied out from the cuda allocated host memory. Instead, the host memory will be returned directly. This memory WILL BE OVERWRITTEN INPLACE by future preprocessing calls. verbose : bool, optional Whether or not to log additional information. Returns ------- list[np.ndarray] The postprocessed outputs, reshaped and scaled based on ratios/padding. """ if verbose: LOG.debug(f"EfficientNMS postprocess, bboxes shape: {outputs[1].shape}") return _postprocess_efficient_nms_core( outputs, ratios, padding, conf_thres, no_copy=no_copy, )
@register_jit(nogil=True) def _postprocess_efficient_nms_core( outputs: list[np.ndarray], ratios: tuple[float, float], padding: tuple[float, float], conf_thres: float | None = None, *, no_copy: bool | None = None, ) -> list[np.ndarray]: # efficient NMS postprocessor essentially # inputs are list[num_dets, bboxes, scores, classes] num_dets, bboxes, scores, class_ids = outputs ratio_width, ratio_height = ratios pad_x, pad_y = padding # throw out all detections not included in the num_dets num_det_id = int(outputs[0][0]) # needs to be integer bboxes = bboxes[:, :num_det_id] scores = scores[:, :num_det_id] class_ids = class_ids[:, :num_det_id] if conf_thres is not None: mask = scores >= conf_thres bboxes = np.where(mask[..., np.newaxis], bboxes, 0) scores = np.where(mask, scores, 0) class_ids = np.where(mask, class_ids, 0) adjusted_bboxes = bboxes adjusted_bboxes[:, :, 0] = (adjusted_bboxes[:, :, 0] - pad_x) / ratio_width # x1 adjusted_bboxes[:, :, 1] = (adjusted_bboxes[:, :, 1] - pad_y) / ratio_height # y1 adjusted_bboxes[:, :, 2] = (adjusted_bboxes[:, :, 2] - pad_x) / ratio_width # x2 adjusted_bboxes[:, :, 3] = (adjusted_bboxes[:, :, 3] - pad_y) / ratio_height # y2 adjusted_bboxes = np.clip(adjusted_bboxes, 0, None) if no_copy: return [num_dets, adjusted_bboxes, scores, class_ids] return [num_dets.copy(), adjusted_bboxes.copy(), scores.copy(), class_ids.copy()]
[docs] def decode_efficient_nms( outputs: list[np.ndarray], conf_thres: float | None = None, nms_iou_thres: float = 0.5, *, extra_nms: bool | None = None, agnostic_nms: bool | None = None, verbose: bool | None = None, ) -> list[tuple[tuple[int, int, int, int], float, int]]: """ Decode EfficientNMS plugin output. Must have called postprocess_efficient_nms before calling this function, due to the reshape stage needing to occur. Parameters ---------- outputs : list[np.ndarray] The outputs from a model with EfficientNMS output conf_thres : float A confidence value to threshold detctions by. By default None. nms_iou_thres : float The IOU threshold to use during the optional additional NMS operation. By default, 0.5 extra_nms : bool, optional Whether or not an additional CPU-side NMS operation should be conducted on final detections. agnostic_nms : bool, optional Whether or not to perform class-agnostic NMS during the optional additional operation. verbose : bool, optional Whether or not to log additional information. Returns ------- list[tuple[tuple[int, int, int, int], float, int]] The decoded outputs. Bounding box (x1, y1, x2, y2), confidence score, classid """ if verbose: LOG.debug(f"Generating detections for: {int(outputs[0][0])} bboxes") frame_dects = _decode_efficient_nms_core(outputs, conf_thres) if extra_nms: frame_dects = nms( frame_dects, iou_threshold=nms_iou_thres, agnostic=agnostic_nms, ) return frame_dects
@register_jit(nogil=True) def _decode_efficient_nms_core( outputs: list[np.ndarray], conf_thres: float | None = None, ) -> list[tuple[tuple[int, int, int, int], float, int]]: num_dects: int = int(outputs[0][0]) bboxes: np.ndarray = outputs[1][0] scores: np.ndarray = outputs[2][0] classes: np.ndarray = outputs[3][0] conf_thres = conf_thres or 0.0 frame_dects: list[tuple[tuple[int, int, int, int], float, int]] = [] for idx in range(num_dects): x1, y1, x2, y2 = bboxes[idx] score = float(scores[idx]) np_classid = classes[idx] if score >= conf_thres: entry = ((int(x1), int(y1), int(x2), int(y2)), score, int(np_classid)) frame_dects.append(entry) return frame_dects