Search code examples
python-3.xopencvasynchronouspython-asynciotemplate-matching

Sharing asyncio.Queue with another thread or process


I've recently converted my old template matching program to asyncio and I have a situation where one of my coroutines relies on a blocking method (processing_frame).

I want to run that method in a seperate thread or process whenever the coroutine that calls that method (analyze_frame) gets an item from the shared asyncio.Queue()

I'm not sure if that's possible or worth it performance wise since I have very little experience with threading and multiprocessing

import cv2
import datetime
import argparse
import os
import asyncio

#   Making CLI
if not os.path.exists("frames"):
    os.makedirs("frames")

t0 = datetime.datetime.now()
ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video", required=True,
                help="path to our file")
args = vars(ap.parse_args())

threshold = .2
death_count = 0
was_found = False
template = cv2.imread('youdied.png')
vidcap = cv2.VideoCapture(args["video"])

loop = asyncio.get_event_loop()
frames_to_analyze = asyncio.Queue()


def main():
    length = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
    tasks = []
    for _ in range(int(length / 50)):
        tasks.append(loop.create_task(read_frame(50, frames_to_analyze)))
        tasks.append(loop.create_task(analyze_frame(threshold, template, frames_to_analyze)))
    final_task = asyncio.gather(*tasks)
    loop.run_until_complete(final_task)

    dt = datetime.datetime.now() - t0
    print("App exiting, total time: {:,.2f} sec.".format(dt.total_seconds()))

    print(f"Deaths registered: {death_count}")


async def read_frame(frames, frames_to_analyze):
    global vidcap
    for _ in range(frames-1):
        vidcap.grab()

    else:
        current_frame = vidcap.read()[1]
    print("Read 50 frames")
    await frames_to_analyze.put(current_frame)


async def analyze_frame(threshold, template, frames_to_analyze):
    global vidcap
    global was_found
    global death_count
    frame = await frames_to_analyze.get()
    is_found = processing_frame(frame)
    if was_found and not is_found:
        death_count += 1
        await writing_to_file(death_count, frame)
    was_found = is_found


def processing_frame(frame):
    res = cv2.matchTemplate(frame, template, cv2.TM_CCOEFF_NORMED)
    max_val = cv2.minMaxLoc(res)[1]
    is_found = max_val >= threshold
    print(is_found)
    return is_found


async def writing_to_file(death_count, frame):
    cv2.imwrite(f"frames/frame{death_count}.jpg", frame)

if __name__ == '__main__':
    main()

I've tried using unsync but without much success
I would get something along the lines of

with self._rlock:
PermissionError: [WinError 5] Access is denied


Solution

  • If processing_frame is a blocking function, you should call it with await loop.run_in_executor(None, processing_frame, frame). That will submit the function to a thread pool and allow the event loop to proceed with doing other things until the call function completes.

    The same goes for calls such as cv2.imwrite. As written, writing_to_file is not truly asynchronous, despite being defined with async def. This is because it doesn't await anything, so once its execution starts, it will proceed to the end without ever suspending. In that case one could as well make it a normal function in the first place, to make it obvious what's going on.