Search code examples
pythonopencvmultiprocessingpipelining

How to implement Pipelining in Python?


I have a program that processes a live video of some markers.

It is divided into:

  1. Import next image of video
  2. Convert Image to readable form
  3. Detection of Markers
  4. Tracking of Markers
  5. Draw UI

This is working pretty well on my pc but it needs to work on a Raspberry Pi as well, so using just one core the whole time wont cut it.

That's why I want to introduce pipelining. In my computer architecture course in the university I learned about hardware pipelining so I was wondering if it would be possible to implement something like that in python:

So instead of doing Import -> Conversion -> Processing -> Tracking -> Draw -> ...

I want to do it like this:

-1----2----3----4-----5----...
Imp--Imp--Imp--Imp---Imp---...
-----Conv-Conv-Conv--Conv--...
----------Pro--Pro---Pro---...
---------------Track-Track-...
---------------------Draw--...

So that every "clock cycle" an image is ready and not only every 5th.

So I was thinking about using the Multiprocessing library of python for this but I have no experience with it but some simple test programs so im not sure what would best suit this use case i.e Queue, Pool, Manager,...

SOLVED:

This can be done with mpipe a cool pipelining tool kit for python. [http://vmlaker.github.io/mpipe/][1]

while True:
    stage1 = mpipe.OrderedStage(conversion, 3)
    stage2 = mpipe.OrderedStage(processing, 3)
    stage3 = mpipe.OrderedStage(tracking, 3)
    stage4 = mpipe.OrderedStage(draw_squares, 3)
    stage5 = mpipe.OrderedStage(ui, 3)

    pipe = mpipe.Pipeline(stage1.link(stage2.link(stage3.link(stage4.link(stage5)))))

    images = []
    while len(images) < 3:
        ret = False
        while not ret:
            ret, image = cap.read()
        images.append(image)

    for i in images:
        t = (i, frame_counter, multi_tracker)
        pipe.put(t)

    pipe.put(None)

    for result in pipe.results():
        image, multi_tracker, frame_counter = result
        Show.show_win("video", image)

As @r_e suggested I read multiple images at the start and fill a pipeline with it. Now in every step of the calculations multiple worker-processes are started so that everyone can work on a separate image.

As some additional information needs to be passed aside from just the image I just return image and the additional information and unpack it in the next stage again.

At the moment I had to disable the tracking so im not able to compare it to the old version. Atm it is a bit slower (tracking would imporve speed as I would not need to detected objects in every frame but only every 30th). But ill give you an update if I get it to work.


Solution

  • So with the help of r_e I found a neat toolkit called mpipe which can be used for pipelining with python.

    While testing I found out that importing and displaying images is a lot faster than conversion, processing and drawing the UI so I am only using a 3 stage pipeline.

    It is pretty easy to use:

    def conversion(input_data):
        original, frame_counter = input_data
        ...
        return con.gbg(con.down_sample(con.copy_img(original))), frame_counter
    
    def processing(input_data):
        image, frame_counter = input_data
        ...
        return markers, frame_counter
    
    
    def ui(input_data):
        markers, frame_counter = input_data
        ...
        return image, frame_counter, triangle
    
    
    def main():
        ...
        while True:
            stage1 = mpipe.OrderedStage(conversion, 3)
    
            stage2 = mpipe.OrderedStage(processing, 3)
    
            stage3 = mpipe.OrderedStage(ui, 3)
    
            pipe = mpipe.Pipeline(stage1.link(stage2.link(stage3)))
    
            images = []
            while len(images) < 3:
                ret = False
                while not ret:
                    ret, image = cap.read()
                images.append(image)
    
            for i in images:
                t = (i, frame_counter)
                pipe.put(t)
    
            pipe.put(None)
    
            for result in pipe.results():
                image, frame_counter, triangle = result
                if not triangle:
                    if t_count > 6:
                        Show.show_win("video", image)
                        t_count = 0
                    else:
                        t_count += 1
                else:
                    Show.show_win("video", image)