Search code examples
pythonmultithreadingasynchronouspython-multiprocessingblocking

Handle blocking operations efficiently in python


I'm using python and OpenCV to get video from a rtsp stream. I'm getting single frames from the stream and saving them to the file system.

I wrote a StreamingWorker which handles frame getting and saving. Additionally there is a StreamPool that has all the streaming objects. I thought that as the StreamingWorker would always be running, there should only be one per core, in order to take as much as possible. Then the StreamPool would provide the VideoCapture objects to the available StreamingWorker.

The problem is that most of the time that the script is running, is blocking:

import os
import time
import threading
import cv2 as cv

class StreamingWorker(object):

    def __init__(self, stream_pool):
        self.stream_pool = stream_pool
        self.start_loop()

    def start_loop(self):
        while True:
            try:
                # getting a stream from the read_strategy
                stream_object = self.stream_pool.next()

                # getting an image from the stream
                _, frame = stream_object['stream'].read()

                # saving image to file system
                cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))

            except ValueError as e:
                print('[error] {}'.format(e))

class StreamPool(object):

    def __init__(self, streams):
        self.streams = [{'feed': stream, 'stream': cv.VideoCapture(stream)} for stream in streams]
        self.current_stream = 0
        self.lock = threading.RLock()

    def next(self):
        self.lock.acquire()
        if(self.current_stream + 1 >= len(self.streams)):
            self.current_stream = 0
        else:
            self.current_stream += 1
        result = self.streams[self.current_stream]
        self.lock.release()
        return result

def get_cores():
    # This function returns the number of available cores
    import multiprocessing
    return multiprocessing.cpu_count()


def start(stream_pool):
    StreamingWorker(stream_pool)

def divide_list(input_list, amount):
    # This function divides the whole list into list of lists
    result = [[] for _ in range(amount)]
    for i in range(len(input_list)):
        result[i % len(result)].append(input_list[i])
    return result

if __name__ == '__main__':

    stream_list = ['rtsp://some/stream1', 'rtsp://some/stream2', 'rtsp://some/stream3']

    num_cores = get_cores()
    divided_streams = divide_list(stream_list, num_cores)
    for streams in divided_streams:
        stream_pool = StreamPool(streams)
        thread = threading.Thread(target=start, args=(stream_pool))
        thread.start()

When I thought of this, I didn't take into account that most of the operations will be blocking operations like:

# Getting a frame blocks
_, frame = stream_object['stream'].read()

# Writing to the file system blocks
cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))

The problem with spending too much time blocking is that most of the processing power is wasted. I thought of using futures with a ThreadPoolExecutor but I can't seem to reach my goal of using the maximum amount of processing cores possible. Maybe I'm not setting enaugh threads.

Is there a standard way of handling blocking operations, in order to make the best use of the cores' processing power? I'm fine having a language-agnostic answer.


Solution

  • I ended up using the ThreadPoolExecutor using the add_done_callback(fn) function.

    class StreamingWorker(object):
    
        def __init__(self, stream_pool):
            self.stream_pool = stream_pool
            self.thread_pool = ThreadPoolExecutor(10)
            self.start_loop()
    
        def start_loop(self):
            def done(fn):
                print('[info] future done')
    
            def save_image(stream):
                # getting an image from the stream
                _, frame = stream['stream'].read()
    
                # saving image to file system
                cv.imwrite(os.path.join('result', stream['feed'], '{}.jpg'.format(time.time())))
    
            while True:
                try:
                    # getting a stream from the read_strategy
                    stream_object = self.stream_pool.next()
    
                    # Scheduling the process to the thread pool
                    self.thread_pool.submit(save_image, (stream_object)).add_done_callback(done)
                except ValueError as e:
                    print('[error] {}'.format(e))
    

    I didn't actually want to do anything after the future finished, but if I used result() then the while True would stop, which whould also defeat all the purpose of using the thread pool.

    Side note: I had to add a threading.Rlock() when calling self.stream_pool.next() because apparently opencv can't handle calls from multiple threads.