# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
# mypy: disable-error-code="import-untyped"
from __future__ import annotations
import contextlib
from queue import Empty, Queue
from threading import Thread
from typing import TYPE_CHECKING
from ._engine import TRTEngine
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from pathlib import Path
import numpy as np
from typing_extensions import Self
with contextlib.suppress(ImportError):
try:
import cuda.bindings.runtime as cudart
except (ImportError, ModuleNotFoundError):
from cuda import cudart
def _identity(data: list[np.ndarray]) -> list[np.ndarray]:
return data
[docs]
class TRTModel:
"""
A wrapper around a TensorRT engine that handles the device memory.
It is thread and process safe to create multiple TRTModels.
It is valid to create a TRTModel in one thread and use in another.
Each TRTModel has its own CUDA context and there is no safeguards
implemented in the class for datarace conditions. As such, a
single TRTModel should not be used in multiple threads or processes.
"""
def __init__(
self: Self,
engine_path: Path | str,
preprocess: Callable[[list[np.ndarray]], list[np.ndarray]] = _identity,
postprocess: Callable[[list[np.ndarray]], list[np.ndarray]] = _identity,
warmup_iterations: int = 5,
engine_type: type[TRTEngine] | None = None,
*,
warmup: bool | None = None,
) -> None:
"""
Use to initialize the TRTModel.
Parameters
----------
engine_path : Path, str
The path to the serialized engine file
preprocess : callable[[list[np.ndarray]], list[np.ndarray]]
The function to preprocess the inputs.
Default is identity function.
postprocess : callable[[list[np.ndarray]], list[np.ndarray]]
The function to postprocess the outputs.
Default is identity function.
warmup : bool, optional
Whether to do warmup iterations, by default None
If None, warmup will be set to False
warmup_iterations : int, optional
The number of warmup iterations to do, by default 5
engine_type : TRTEngine, optional
An alternative engine type to use, by default None
"""
trtengine_type: type[TRTEngine] = TRTEngine
if engine_type is not None:
trtengine_type = engine_type
self._engine = trtengine_type(
engine_path=engine_path,
warmup_iterations=warmup_iterations,
warmup=warmup,
)
self._preprocess: Callable[[list[np.ndarray]], list[np.ndarray]] = preprocess
self._postprocess: Callable[[list[np.ndarray]], list[np.ndarray]] = postprocess
@property
def engine(self: Self) -> TRTEngine:
"""Access the underlying TRTEngine."""
return self._engine
@property
def stream(self: Self) -> cudart.cudaStream_t:
"""Access the underlying CUDA stream."""
return self._engine.stream
@property
def preprocessor(self: Self) -> Callable[[list[np.ndarray]], list[np.ndarray]]:
"""The preprocessing function used in this model."""
return self._preprocess
@preprocessor.setter
def preprocessor(
self: Self,
new_preprocess: Callable[[list[np.ndarray]], list[np.ndarray]],
) -> None:
"""
Set the preprocessing function used in this model.
Useful in case the preprocessor need information which is only
accessible after loading the engine.
"""
self._preprocess = new_preprocess
@property
def postprocessor(self: Self) -> Callable[[list[np.ndarray]], list[np.ndarray]]:
"""The postprocessing function used in this model."""
return self._postprocess
def __call__(
self: Self,
inputs: list[np.ndarray],
*,
preprocessed: bool | None = None,
postprocess: bool | None = None,
) -> list[np.ndarray]:
"""
Execute the model with the given inputs.
Parameters
----------
inputs : list[np.ndarray]
The inputs to the model
preprocessed : bool, optional
Whether the inputs are already preprocessed, by default None
If None, the inputs will be preprocessed
postprocess : bool, optional
Whether or not to postprocess the outputs, by default None
If None, the outputs will be postprocessed
Returns
-------
list[np.ndarray]
The outputs of the model
"""
return self.run(inputs, preprocessed=preprocessed, postprocess=postprocess)
[docs]
def mock_run(
self: Self,
data: list[np.ndarray] | None = None,
) -> list[np.ndarray]:
"""
Execute the model with random inputs.
Parameters
----------
data : list[np.ndarray], optional
The inputs to the model, by default None
If None, random inputs will be used
Returns
-------
list[np.ndarray]
The outputs of the model
"""
if data is None:
data = self._engine.get_random_input()
return self.run(data, preprocessed=True, postprocess=False)
[docs]
def preprocess(self: Self, inputs: list[np.ndarray]) -> list[np.ndarray]:
"""
Preprocess the inputs.
Parameters
----------
inputs : list[np.ndarray]
The inputs to preprocess
Returns
-------
list[np.ndarray]
The preprocessed inputs
"""
return self._preprocess(inputs)
[docs]
def postprocess(self: Self, outputs: list[np.ndarray]) -> list[np.ndarray]:
"""
Postprocess the outputs.
Parameters
----------
outputs : list[np.ndarray]
The outputs to postprocess
Returns
-------
list[np.ndarray]
The postprocessed outputs
"""
return self._postprocess(outputs)
[docs]
def run(
self: Self,
inputs: list[np.ndarray],
*,
preprocessed: bool | None = None,
postprocess: bool | None = None,
) -> list[np.ndarray]:
"""
Execute the model with the given inputs.
Parameters
----------
inputs : list[np.ndarray]
The inputs to the model
preprocessed : bool, optional
Whether the inputs are already preprocessed, by default None
If None, the inputs will be preprocessed
postprocess : bool, optional
Whether or not to postprocess the outputs, by default None
If None, the outputs will be postprocessed
Returns
-------
list[np.ndarray]
The outputs of the model
"""
if preprocessed is None:
preprocessed = False
if postprocess is None:
postprocess = True
if not preprocessed:
inputs = self._preprocess(inputs)
outputs = self._engine.execute(inputs)
if postprocess:
outputs = self._postprocess(outputs)
return outputs
[docs]
class QueuedTRTModel:
"""Interact with TRTModel over a Thread and Queue."""
def __init__(
self: Self,
engine_path: Path | str,
preprocess: Callable[[list[np.ndarray]], list[np.ndarray]] = _identity,
postprocess: Callable[[list[np.ndarray]], list[np.ndarray]] = _identity,
warmup_iterations: int = 5,
engine_type: type[TRTEngine] | None = None,
*,
warmup: bool | None = None,
) -> None:
"""
Create a QueuedTRTModel.
Parameters
----------
engine_path : Path, str
The Path to the compiled TensorRT Engine.
preprocess : Callable[[list[np.ndarray]], list[np.ndarray]]
The function to preprocess the inputs.
Default is identity function.
postprocess : Callable[[list[np.ndarray]], list[np.ndarray]]
The function to postprocess the inputs.
Default is identity function.
warmup_iterations : int
The number of warmup iteratiosn to perform.
By default 5
engine_type : type[TRTEngine], optional
The type of TRTEngine to utilize.
warmup : bool, optional
Whether or not to perform the warmup iterations.
"""
self._stopped = False # flag for if user stopped thread
self._model: TRTModel | None = None # storage for model data
self._input_queue: Queue[tuple[list[np.ndarray], bool | None]] = Queue()
self._output_queue: Queue[list[np.ndarray]] = Queue()
self._thread = Thread(
target=self._run,
kwargs={
"engine_path": engine_path,
"preprocess": preprocess,
"postprocess": postprocess,
"warmup_iterations": warmup_iterations,
"engine_type": engine_type,
"warmup": warmup,
},
)
[docs]
def stop(
self: Self,
) -> None:
"""Stop the thread containing the TRTEngine."""
self._stopped = True
self._thread.join()
[docs]
def submit(
self: Self,
data: list[np.ndarray],
*,
preprocessed: bool | None = None,
) -> None:
"""
Put data in the input queue.
Parameters
----------
data : list[np.ndarray]
The data to have the engine run.
preprocessed : bool, optional
Whether or not the input is already preprocessed.
"""
self._input_queue.put((data, preprocessed))
[docs]
def retrieve(
self: Self,
timeout: float | None = None,
) -> list[np.ndarray] | None:
"""
Get an output from the engine thread.
Parameters
----------
timeout : float, optional
Timeout for waiting for data.
Returns
-------
list[np.ndarray]
The output from the engine.
"""
with contextlib.suppress(Empty):
return self._output_queue.get(timeout=timeout)
return None
def _run(
self: Self,
engine_path: Path | str,
preprocess: Callable[[list[np.ndarray]], list[np.ndarray]],
postprocess: Callable[[list[np.ndarray]], list[np.ndarray]],
warmup_iterations: int,
engine_type: type[TRTEngine] | None,
*,
warmup: bool | None = None,
) -> None:
self._model = TRTModel(
engine_path=engine_path,
preprocess=preprocess,
postprocess=postprocess,
warmup_iterations=warmup_iterations,
engine_type=engine_type,
warmup=warmup,
)
while not self._stopped:
try:
inputs, preprocessed = self._input_queue.get(timeout=0.1)
except Empty:
continue
result = self._model.run(inputs, preprocessed=preprocessed)
self._output_queue.put(result)
[docs]
class ParallelTRTModels:
"""Handle many TRTModels in parallel."""
def __init__(
self: Self,
engine_paths: Sequence[Path | str],
preprocess: Callable[[list[np.ndarray]], list[np.ndarray]]
| list[Callable[[list[np.ndarray]], list[np.ndarray]]] = _identity,
postprocess: Callable[[list[np.ndarray]], list[np.ndarray]]
| list[Callable[[list[np.ndarray]], list[np.ndarray]]] = _identity,
warmup_iterations: int = 5,
*,
warmup: bool | None = None,
) -> None:
"""
Create a ParallelTRTModels instance.
Parameters
----------
engine_paths : Sequence[Path | str]
The Paths to the compiled engines to use.
preprocess : Callable[[list[np.ndarray]], list[np.ndarray]] | list[Callable[[list[np.ndarray]], list[np.ndarray]]]
The preprocessing function(s)
postprocess : Callable[[list[np.ndarray]], list[np.ndarray]] | list[Callable[[list[np.ndarray]], list[np.ndarray]]]
The postprocessing function(s)
warmup_iterations : int
The number of iteratiosn to perform warmup for.
By default 5
warmup : bool, optional
Whether or not to run warmup iterations on the engines.
"""
preprocessors = (
preprocess
if isinstance(preprocess, list)
else [preprocess] * len(engine_paths)
)
postprocessors = (
postprocess
if isinstance(postprocess, list)
else [postprocess] * len(engine_paths)
)
self._engines: list[QueuedTRTModel] = [
QueuedTRTModel(
engine_path=epath,
preprocess=pre,
postprocess=post,
warmup_iterations=warmup_iterations,
warmup=warmup,
)
for epath, pre, post in zip(engine_paths, preprocessors, postprocessors)
]
[docs]
def stop(self: Self) -> None:
"""Stop the underlying engine threads."""
for engine in self._engines:
engine.stop()
[docs]
def submit(
self: Self,
inputs: list[list[np.ndarray]],
*,
preprocessed: bool | None = None,
) -> None:
"""
Submit data to be processed by the engines.
Parameters
----------
inputs : list[list[np.ndarray]]
The inputs to pass to the engines.
Should be a list of the same lenght of engines created.
preprocessed : bool, optional
Whether or not the inputs are already preprocessed.
Raises
------
ValueError
If the inputs are not the same size as the engines.
"""
if len(inputs) != len(self._engines):
err_msg = (
f"Cannot match {len(inputs)} inputs to {len(self._engines)} engines."
)
raise ValueError(err_msg)
for data, engine in zip(inputs, self._engines):
engine.submit(data, preprocessed=preprocessed)
[docs]
def retrieve(
self: Self,
timeout: float | None = None,
) -> list[list[np.ndarray] | None]:
"""
Get the outputs from the engines.
Parameters
----------
timeout : float, optional
Timeout for waiting for data.
Returns
-------
list[np.ndarray]
The output from the engines.
"""
return [engine.retrieve(timeout=timeout) for engine in self._engines]