Search code examples
yolov8openvino

How to run YOLO8 & OpenVINO in async mode?


from deep_sort_realtime.deepsort_tracker import DeepSort
from typing import Tuple
from ultralytics import YOLO
from typing import Literal, get_args, Any
from openvino.runtime import Core
from openvino.preprocess import PrePostProcessor
from openvino import Type, Layout, save_model
from ultralytics.utils import ops
import torch
import numpy as np
import cv2
import sys
import cvzone

class VehicleDetector(object):
    
    def __init__(self, device: str):        
        allowed_devices = get_args(Literal["CPU", "GPU"])
        if device not in allowed_devices:
            raise RuntimeError(f"Invalid device. Allowed values are: {allowed_devices}")            
        self.device = device        
        self.tracker = DeepSort()        
        self.counter = {}        
        try:        
            self.model = self.prepared_model("/home/vsevolod/transport/yolov8s_openvino_model/yolov8s.xml")
       
        except RuntimeError:
            print("Model with this path doesn't exists!") 
            sys.exit(0)     
            
    # --------------------------- Generation and statistical functions --------------------------------------------------------------------

    
    def yolo_toIR(self, yolo8s_path: str) -> None:        
        yolo_model = YOLO(yolo8s_path)        
        yolo_model.export(format="openvino")
        
    def preprocess(self, openvino_path: str, new_openvino_path: str) -> None:        
        ie = Core()                        
        ov_model = ie.read_model(model=openvino_path + "yolov8s.xml")        
        ppp = PrePostProcessor(ov_model)        
        ppp.input().tensor() \
        .set_element_type(Type.u8) \
        .set_layout(Layout('NCHW'))
        ppp.input().model().set_layout(Layout('NCHW'))        
        ppp.output().tensor().set_element_type(Type.f32)
        preprocessed_model = ppp.build()        
        save_model(preprocessed_model, new_openvino_path + "preprocessed_yolov8s.xml")
        
    
    def frame_info(self, frame: Any) -> None:
        print(f"The original shape of the image is {frame.shape}")
        print(f"The original data type of the image is {frame.dtype}")
        #(640, 640, 3)
        #uint8
        
    def model_shape_info(self, openvino_path: str,) -> None:        
        ie = Core()                        
        ov_model = ie.read_model(model=openvino_path)        
        _, _, h, w = ov_model.input().shape
        print(f"The original h of the model is {h}")
        print(f"The original w of the model is {w}")        
        #(640, 640)        
        
    # --------------------------- Init and preprocess functions --------------------------------------------------------------------
    
    

    def letterbox(self, img: np.ndarray, new_shape:Tuple[int, int] = (640, 640), 
                  color:Tuple[int, int, int] = (114, 114, 114), auto:bool = False, 
                  scale_fill:bool = False, scaleup:bool = False, stride:int = 32):
        """
        Resize image and padding for detection. Takes image as input,
        resizes image to fit into new shape with saving original aspect ratio and pads it to meet stride-multiple constraints
    
        Parameters:
          img (np.ndarray): image for preprocessing
          new_shape (Tuple(int, int)): image size after preprocessing in format [height, width]
          color (Tuple(int, int, int)): color for filling padded area
          auto (bool): use dynamic input size, only padding for stride constrins applied
          scale_fill (bool): scale image to fill new_shape
          scaleup (bool): allow scale image if it is lower then desired input size, can affect model accuracy
          stride (int): input padding stride
        Returns:
          img (np.ndarray): image after preprocessing
          ratio (Tuple(float, float)): hight and width scaling ratio
          padding_size (Tuple(int, int)): height and width padding size
    
    
        """
        # Resize and pad image while meeting stride-multiple constraints
        shape = img.shape[:2]  # current shape [height, width]
        if isinstance(new_shape, int):
            new_shape = (new_shape, new_shape)
    
        # Scale ratio (new / old)
        r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
        if not scaleup:  # only scale down, do not scale up (for better test mAP)
            r = min(r, 1.0)
    
        # Compute padding
        ratio = r, r  # width, height ratios
        new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
        dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]  # wh padding
        if auto:  # minimum rectangle
            dw, dh = np.mod(dw, stride), np.mod(dh, stride)  # wh padding
        elif scale_fill:  # stretch
            dw, dh = 0.0, 0.0
            new_unpad = (new_shape[1], new_shape[0])
            ratio = new_shape[1] / shape[1], new_shape[0] / shape[0]  # width, height ratios
    
        dw /= 2  # divide padding into 2 sides
        dh /= 2
    
        if shape[::-1] != new_unpad:  # resize
            img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
        top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
        left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
        img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)  # add border
        return img, ratio, (dw, dh)

    def preprocess_image(self, img0: np.ndarray):
        """
        Preprocess image according to YOLOv8 input requirements.
        Takes image in np.array format, resizes it to specific size using letterbox resize and changes data layout from HWC to CHW.
    
        Parameters:
          img0 (np.ndarray): image for preprocessing
        Returns:
          img (np.ndarray): image after preprocessing
        """
        # resize
        img = self.letterbox(img0)[0]
    
        # Convert HWC to CHW
        img = img.transpose(2, 0, 1)
        img = np.ascontiguousarray(img)
        return img
    
    def image_to_tensor(self, image:np.ndarray):
        """
        Preprocess image according to YOLOv8 input requirements.
        Takes image in np.array format, resizes it to specific size using letterbox resize and changes data layout from HWC to CHW.
    
        Parameters:
          img (np.ndarray): image for preprocessing
        Returns:
          input_tensor (np.ndarray): input tensor in NCHW format with float32 values in [0, 1] range
        """
        input_tensor = image.astype(np.float32)  # uint8 to fp32
        input_tensor /= 255.0  # 0 - 255 to 0.0 - 1.0
    
        # add batch dimension
        if input_tensor.ndim == 3:
            input_tensor = np.expand_dims(input_tensor, 0)
        return input_tensor
    
    def postprocess(
            self,
            pred_boxes:np.ndarray,
            input_hw:Tuple[int, int],
            orig_img:np.ndarray,
            min_conf_threshold:float = 0.25,
            nms_iou_threshold:float = 0.7,
            agnosting_nms:bool = False,
            max_detections:int = 300,
            ):
        """
        YOLOv8 model postprocessing function. Applied non maximum supression algorithm to detections and rescale boxes to original image size
        Parameters:
            pred_boxes (np.ndarray): model output prediction boxes
            input_hw (np.ndarray): preprocessed image
            orig_image (np.ndarray): image before preprocessing
            min_conf_threshold (float, *optional*, 0.25): minimal accepted confidence for object filtering
            nms_iou_threshold (float, *optional*, 0.45): minimal overlap score for removing objects duplicates in NMS
            agnostic_nms (bool, *optiona*, False): apply class agnostinc NMS approach or not
            max_detections (int, *optional*, 300):  maximum detections after NMS
        Returns:
           pred (List[Dict[str, np.ndarray]]): list of dictionary with det - detected boxes in format [x1, y1, x2, y2, score, label]
        """
        nms_kwargs = {"agnostic": agnosting_nms, "max_det":max_detections}
        preds = ops.non_max_suppression(
            torch.from_numpy(pred_boxes),
            min_conf_threshold,
            nms_iou_threshold,
            nc=80,
            **nms_kwargs
        )
    
        results = []
        for i, pred in enumerate(preds):
            shape = orig_img[i].shape if isinstance(orig_img, list) else orig_img.shape
            if not len(pred):
                results.append({"det": [], "segment": []})
                continue
            pred[:, :4] = ops.scale_boxes(input_hw, pred[:, :4], shape).round()
            results.append({"det": pred})
    
        return results
    
    def prepared_model(self, openvino_path: str) -> Any:
        
        ie = Core()
        
        ov_model = ie.read_model(openvino_path)
        
        ov_config = {}
        
        if self.device == "GPU" and "GPU" in ie.available_devices:
            ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
        
        model = ie.compile_model(ov_model, self.device, ov_config)
                
        return model
    
    # --------------------------- Result functions --------------------------------------------------------------------

    
    def predict(self, frame: Any) -> Any:                
        preprocess_image = self.preprocess_image(frame)        
        input_tensor = self.image_to_tensor(preprocess_image)        
        boxes = self.model(input_tensor)[self.model.output(0)]        
        input_hw = input_tensor.shape[2:]        
        detections = self.postprocess(pred_boxes=boxes, input_hw=input_hw, orig_img=frame)        
        return detections[0]["det"] 
    
    
    def draw_bounding_boxes(self, frame: Any) -> Any:        
        frame_copy = frame.copy()                
        output = self.predict(frame)        
        detections = []        
        for result in output:        
            x1, y1, x2, y2 = map(int, result[:4])            
            conf, class_id = result[4:]            
            w, h = x2-x1, y2-y1            
            if conf < 0.7:
                continue
                
            if frame_copy[y1:y2, x1:x2].mean() > 50:
                if class_id in (1, 2, 3, 5, 7):                        
                    detections.append([[x1, y1, w, h], conf, class_id]) 
                                                
        tracked_objects = self.tracker.update_tracks(detections, frame=frame_copy)
        
        for track in tracked_objects:            
            if not track.is_confirmed():
                continue
            
            track_id = track.track_id
                        
            ltrb = track.to_ltrb()
            
            track_class_id = track.det_class  
            
            if track_id not in self.counter:
                self.counter[track_id] = track_class_id
            
            bbox = ltrb
            x1, y1, x2, y2 = map(int, bbox)                                                                        
            w, h = x2-x1, y2-y1
            
            shape = cvzone.cornerRect(frame_copy, (x1, y1, w, h), l=9, rt=2) 
                                    
            cv2.putText(shape, f"{track_class_id} {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 0))
            
        
        return frame_copy

    #try to make it async to speed up execution not SYNC
    #IT IS VERY IMPORTANT FOR PERFORMANCE
    
    def run(self, video_path: str):        
        video = cv2.VideoCapture(video_path)        
        if not video.isOpened():
            return
        
        while video.isOpened():            
            ret, frame = video.read()                        
            if not ret:                
                break            
            run_frame = self.draw_bounding_boxes(frame)            
            cv2.imshow("Video", run_frame)            
            if cv2.waitKey(1) == 27:                
                break
        
        video.release()
        
        cv2.destroyAllWindows()        
    def get_count(self) -> int:
        
        return len(self.counter)
        
        

I am working on creating a vehicle counter and I tried to speed up my model execution, but unfortunately, I cannot find any practical use cases of integrating AsyncInferQueue in complex code such as mine. So can you help me find a solution for how I can use AsyncInferQueue without crashing my existing code?


Solution

  • Now i find solution and want to share with you:

    def callback(self, infer_request, info):
        
        frame, input_hw = info  
    
        res = infer_request.get_output_tensor(0).data
        
        detections = self.postprocess(pred_boxes=res, input_hw=input_hw, orig_img=frame)
    
        self.det_storage.append(detections[0]["det"])
        
            
    
    def draw_bounding_boxes(self, frame: Any) -> Any:
        
        frame_copy = frame.copy()
        
        fg_mask = self.back_sub.apply(frame_copy)
        
        preprocess_image = self.preprocess_image(frame_copy)
        
        input_tensor = self.image_to_tensor(preprocess_image)
        
        input_hw = input_tensor.shape[2:]
        
        infer_queue = AsyncInferQueue(self.model, 4)  
        
        infer_queue.set_callback(self.callback) 
        
        infer_queue.start_async(inputs={0: input_tensor}, userdata=(frame_copy, input_hw))
    
        infer_queue.wait_all()  
    
        
        model_detections = self.det_storage.pop(0)