Search code examples
pythonopencvvideogstreamerpython-multiprocessing

OpenCV GStreamer VideoWriter write hangs in a Python multiprocessing Process


I'm working on real-time image processing using a Raspberry Pi camera. To optimize processing times, I'm trying to separate some image transformations and video encoding in a separate Process:

import time
from multiprocessing import Event as ProcessEvent
from multiprocessing import Process, shared_memory

import cv2
import numpy as np
from picamera2 import MappedArray, Picamera2
from picamera2.request import CompletedRequest


class VideoEncoder(Process):
    def __init__(self):
        super(VideoEncoder, self).__init__()

        self.loop = ProcessEvent()
        self.loop.set()
        self.new_frame = ProcessEvent()
        self.new_frame.clear()

        # Doesn't work
        self.video_writer = cv2.VideoWriter(
            "my_very_long_gstreamer_pipeline",
            fourcc=0,
            fps=30,
            frameSize=(960, 720),
            isColor=True,
        )

        # # Works
        # self.video_writer = cv2.VideoWriter(
        #     "filename.avi",
        #     cv2.VideoWriter_fourcc(*"MJPG"),
        #     fps=30,
        #     frameSize=(960, 720),
        # )

        data = np.zeros((1232, 1640, 3), dtype=np.uint8).flatten()
        self.frame = shared_memory.SharedMemory(create=True, size=data.nbytes)

    def run(self):
        while self.loop.is_set():
            if self.new_frame.wait():
                self.process_frame()
                self.new_frame.clear()

        self.video_writer.release()

    def write(self, frame: np.ndarray):
        sha = np.ndarray(frame.shape, dtype=frame.dtype, buffer=self.frame.buf)
        sha[:] = frame[:]
        self.new_frame.set()
        print("Requested new frame")

    def process_frame(self):
        frame = np.ndarray((1232, 1640, 3), dtype=np.uint8, buffer=self.frame.buf)

        # Transform image in different ways...
        # Draw some stuff on image...
        # Takes some time...

        frame = cv2.resize(frame, (960, 720))

        print("Writing frame to video writer...")
        self.video_writer.write(frame)
        print(" > Written frame!")

    def stop(self):
        self.loop.clear()
        self.new_frame.set()


encoder = VideoEncoder()
encoder.start()


def cam_callback(request: CompletedRequest):
    with MappedArray(request, "main") as m:
        # Do some image processing...
        # Takes some time...

        # Once finished, send file to video stream
        encoder.write(m.array)


if __name__ == "__main__":
    picam = Picamera2(1)

    picam.configure(
        picam.create_video_configuration(
            main={"format": "RGB888", "size": (1640, 1232)}
        )
    )
    picam.pre_callback = cam_callback
    picam.start()

    time.sleep(10)

    picam.stop()
    encoder.stop()

However, it works fine writing to video files, but fails when writing to a GStreamer pipeline.

Here's the result for:

  • GStreamer pipeline, it hangs at self.video_writer.write(frame). Note that it works just fine outside of a Process and I don't get any GStreamer warnings or errors:
Requested new frame
Writing frame to video writer...
 > Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
Requested new frame
Requested new frame
Requested new frame
Requested new frame
Requested new frame
...
  • Video file, works file!
Requested new frame
Writing frame to video writer...
deprecated pixel format used, make sure you did set range correctly
Requested new frame
Requested new frame
 > Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
 > Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
 > Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
 > Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
...

I'm not really sure on how I could debug this. Any suggestions are appreciated!


Solution

  • I'm not completely sure why, but I had to put the VideoWriter initializer in run() to be sure to have it initialized in the correct process context, and that's it!

        def run(self):
            self.video_writer = cv2.VideoWriter(
                "gstreamer_pipeline",
                fourcc=0,
                fps=self.input_framerate,
                frameSize=(960, 720),
                isColor=True,
            )
    
            while self.loop.is_set():
                if self.new_frame.wait():
                    self.process_frame()
                    self.new_frame.clear()
    
            self.video_writer.release()
            self.frame.unlink()