# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
from __future__ import annotations
import cv2
import numpy as np
from cv2ext.bboxes import nms
from cv2ext.image import letterbox, rescale, resize_linear
from trtutils._jit import register_jit
from trtutils._log import LOG
from trtutils.impls.common import decode_efficient_nms, postprocess_efficient_nms
# EfficientNMS as 4 outputs
_EFF_NUM_OUTPUTS = 4
[docs]
def preprocess(
image: np.ndarray,
input_shape: tuple[int, int],
dtype: np.dtype,
input_range: tuple[float, float] = (0.0, 1.0),
method: str = "letterbox",
*,
verbose: bool | None = None,
) -> tuple[np.ndarray, tuple[float, float], tuple[float, float]]:
"""
Preprocess inputs for a YOLO network.
Parameters
----------
image : np.ndarray
The inputs to be preprocessed.
input_shape : tuple[int, int]
The shape to resize the inputs.
dtype : np.dtype
The datatype of the inputs to the network.
input_range : tuple[float, float]
The range of the model expects for inputs.
By default, [0.0, 1.0] (divide input by 255.0)
method : str
The method by which to resize the image.
By default letterbox will be used.
Options are [letterbox, linear]
verbose : bool, optional
Whether or not to log additional information.
Returns
-------
tuple[np.ndarray, tuple[float, float], tuple[float, float]]
The preprocessed data.
Raises
------
ValueError
If the method for resizing is not 'letterbox' or 'linear'
"""
if verbose:
LOG.debug(f"Preprocess input shape: {image.shape}, output: {input_shape}")
if method == "letterbox":
tensor, ratios, padding = letterbox(image, new_shape=input_shape)
elif method == "linear":
tensor, ratios = resize_linear(image, new_shape=input_shape)
padding = (0.0, 0.0)
else:
err_msg = (
"Unknown method for image resizing. Options are ['letterbox', 'linear']"
)
raise ValueError(err_msg)
tensor = cv2.cvtColor(tensor, cv2.COLOR_BGR2RGB)
# tensor = tensor / 255.0 # type: ignore[assignment]
tensor = rescale(tensor, input_range)
tensor = tensor[np.newaxis, :]
tensor = np.transpose(tensor, (0, 3, 1, 2))
# large performance hit to assemble contiguous array
if not tensor.flags["C_CONTIGUOUS"]:
tensor = np.ascontiguousarray(tensor)
tensor = tensor.astype(dtype)
if verbose:
LOG.debug(f"Ratios: {ratios}")
LOG.debug(f"Padding: {padding}")
return tensor, ratios, padding
def _postprocess_v_10(
outputs: list[np.ndarray],
ratios: tuple[float, float],
padding: tuple[float, float],
conf_thres: float | None = None,
*,
no_copy: bool | None = None,
verbose: bool | None = None,
) -> list[np.ndarray]:
if verbose:
LOG.debug(f"V10 postprocess, output shape: {outputs[0].shape}")
return _postprocess_v_10_core(
outputs,
ratios,
padding,
conf_thres=conf_thres,
no_copy=no_copy,
)
@register_jit(nogil=True)
def _postprocess_v_10_core(
outputs: list[np.ndarray],
ratios: tuple[float, float],
padding: tuple[float, float],
conf_thres: float | None = None,
*,
no_copy: bool | None = None,
) -> list[np.ndarray]:
# V10 outputs (1, 300, 6)
# each final entry is (bbox (4 parts), score, classid)
ratio_width, ratio_height = ratios
pad_x, pad_y = padding
output = outputs[0]
bboxes: np.ndarray = output[0, :, :4]
scores: np.ndarray = output[0, :, 4]
class_ids: np.ndarray = output[0, :, 5].astype(int)
# pre-filter by the confidence threshold
if conf_thres is not None:
mask = scores >= conf_thres
bboxes = bboxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
# each bounding box is cx, cy, dx, dy
adjusted_bboxes = bboxes
adjusted_bboxes[:, 0] = (adjusted_bboxes[:, 0] - pad_x) / ratio_width # x1
adjusted_bboxes[:, 1] = (adjusted_bboxes[:, 1] - pad_y) / ratio_height # y1
adjusted_bboxes[:, 2] = (adjusted_bboxes[:, 2] - pad_x) / ratio_width # x2
adjusted_bboxes[:, 3] = (adjusted_bboxes[:, 3] - pad_y) / ratio_height # y2
# Clip the bounding boxes to ensure they're within valid ranges
adjusted_bboxes = np.clip(adjusted_bboxes, 0, None)
if no_copy:
return [adjusted_bboxes, scores, class_ids]
return [adjusted_bboxes.copy(), scores.copy(), class_ids.copy()]
[docs]
def postprocess(
outputs: list[np.ndarray],
ratios: tuple[float, float] = (1.0, 1.0),
padding: tuple[float, float] = (0.0, 0.0),
conf_thres: float | None = None,
*,
no_copy: bool | None = None,
verbose: bool | None = None,
) -> list[np.ndarray]:
"""
Postprocess outputs from a YOLO network.
Parameters
----------
outputs : list[np.ndarray]
The outputs from a YOLO network.
ratios : tuple[float, float]
The ratio of original image to preprocessed shape
padding : tuple[float, float]
The amount of padding added during preprocessing
conf_thres : float, optional
The confidence score for which detections below will be thrown out.
no_copy : bool, optional
If True, the outputs will not be copied out
from the cuda allocated host memory. Instead,
the host memory will be returned directly.
This memory WILL BE OVERWRITTEN INPLACE
by future inference calls.
verbose : bool, optional
Whether or not to log additional information.
Returns
-------
list[np.ndarray]
The postprocessed outputs.
"""
if len(outputs) == _EFF_NUM_OUTPUTS:
return postprocess_efficient_nms(
outputs,
ratios,
padding,
conf_thres=conf_thres,
no_copy=no_copy,
verbose=verbose,
)
return _postprocess_v_10(
outputs,
ratios,
padding,
conf_thres=conf_thres,
no_copy=no_copy,
verbose=verbose,
)
def _get_detections_v_10(
outputs: list[np.ndarray],
conf_thres: float | None = None,
nms_iou_thres: float = 0.5,
*,
extra_nms: bool | None = None,
agnostic_nms: bool | None = None,
verbose: bool | None = None,
) -> list[tuple[tuple[int, int, int, int], float, int]]:
if verbose:
LOG.debug(f"Decoding: {outputs[0].shape[0]} bboxes")
results = _get_detections_v_10_core(outputs, conf_thres)
if extra_nms:
results = nms(results, iou_threshold=nms_iou_thres, agnostic=agnostic_nms)
return results
@register_jit(nogil=True)
def _get_detections_v_10_core(
outputs: list[np.ndarray],
conf_thres: float | None = None,
) -> list[tuple[tuple[int, int, int, int], float, int]]:
# set conf_thres to zero if not provided (include all bboxes)
if conf_thres is None:
conf_thres = 0.0
# unpack
bboxes = outputs[0]
scores = outputs[1]
class_ids = outputs[2]
# convert to output format
results: list[tuple[tuple[int, int, int, int], float, int]] = []
for idx in range(len(bboxes)):
if scores[idx] >= conf_thres:
x1, y1, x2, y2 = bboxes[idx]
entry = (
(int(x1), int(y1), int(x2), int(y2)),
float(scores[idx]),
int(class_ids[idx]),
)
results.append(entry)
return results
[docs]
def get_detections(
outputs: list[np.ndarray],
conf_thres: float | None = None,
nms_iou_thres: float = 0.5,
*,
extra_nms: bool | None = None,
agnostic_nms: bool | None = None,
verbose: bool | None = None,
) -> list[tuple[tuple[int, int, int, int], float, int]]:
"""
Get the detections from the output of a YOLO network.
Parameters
----------
outputs : list[np.ndarray]
The outputs from a YOLO networks.
conf_thres : float, optional
The confidence threshold to use for getting detections.
nms_iou_thres : float
The IOU threshold to use during the optional additional
NMS operation. By default, 0.5
extra_nms : bool, optional
Whether or not an additional CPU-side NMS operation
should be conducted on final detections.
agnostic_nms : bool, optional
Whether or not to perform class-agnostic NMS during the
optional additional operation.
verbose : bool, optional
Whether or not to log additional information.
Returns
-------
list[tuple[tuple[int, int, int, int], float, int]]
The detections from the YOLO netowrk.
Each detection is a bounding box in form x1, y1, x2, y2, a confidence score and a class id.
"""
if len(outputs) == _EFF_NUM_OUTPUTS:
if verbose:
LOG.debug("Using EfficientNMS decoding")
return decode_efficient_nms(
outputs,
conf_thres=conf_thres,
nms_iou_thres=nms_iou_thres,
extra_nms=extra_nms,
agnostic_nms=agnostic_nms,
)
if verbose:
LOG.debug("Using V10 decoding")
return _get_detections_v_10(
outputs,
conf_thres=conf_thres,
nms_iou_thres=nms_iou_thres,
extra_nms=extra_nms,
agnostic_nms=agnostic_nms,
)