I'm using python and OpenCV to get video from a rtsp stream. I'm getting single frames from the stream and saving them to the file system.
I wrote a StreamingWorker
which handles frame getting and saving. Additionally there is a StreamPool
that has all the streaming objects. I thought that as the StreamingWorker
would always be running, there should only be one per core, in order to take as much as possible. Then the StreamPool
would provide the VideoCapture
objects to the available StreamingWorker
.
The problem is that most of the time that the script is running, is blocking:
import os
import time
import threading
import cv2 as cv
class StreamingWorker(object):
def __init__(self, stream_pool):
self.stream_pool = stream_pool
self.start_loop()
def start_loop(self):
while True:
try:
# getting a stream from the read_strategy
stream_object = self.stream_pool.next()
# getting an image from the stream
_, frame = stream_object['stream'].read()
# saving image to file system
cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))
except ValueError as e:
print('[error] {}'.format(e))
class StreamPool(object):
def __init__(self, streams):
self.streams = [{'feed': stream, 'stream': cv.VideoCapture(stream)} for stream in streams]
self.current_stream = 0
self.lock = threading.RLock()
def next(self):
self.lock.acquire()
if(self.current_stream + 1 >= len(self.streams)):
self.current_stream = 0
else:
self.current_stream += 1
result = self.streams[self.current_stream]
self.lock.release()
return result
def get_cores():
# This function returns the number of available cores
import multiprocessing
return multiprocessing.cpu_count()
def start(stream_pool):
StreamingWorker(stream_pool)
def divide_list(input_list, amount):
# This function divides the whole list into list of lists
result = [[] for _ in range(amount)]
for i in range(len(input_list)):
result[i % len(result)].append(input_list[i])
return result
if __name__ == '__main__':
stream_list = ['rtsp://some/stream1', 'rtsp://some/stream2', 'rtsp://some/stream3']
num_cores = get_cores()
divided_streams = divide_list(stream_list, num_cores)
for streams in divided_streams:
stream_pool = StreamPool(streams)
thread = threading.Thread(target=start, args=(stream_pool))
thread.start()
When I thought of this, I didn't take into account that most of the operations will be blocking operations like:
# Getting a frame blocks
_, frame = stream_object['stream'].read()
# Writing to the file system blocks
cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))
The problem with spending too much time blocking is that most of the processing power is wasted. I thought of using futures with a ThreadPoolExecutor
but I can't seem to reach my goal of using the maximum amount of processing cores possible. Maybe I'm not setting enaugh threads.
Is there a standard way of handling blocking operations, in order to make the best use of the cores' processing power? I'm fine having a language-agnostic answer.
I ended up using the ThreadPoolExecutor
using the add_done_callback(fn)
function.
class StreamingWorker(object):
def __init__(self, stream_pool):
self.stream_pool = stream_pool
self.thread_pool = ThreadPoolExecutor(10)
self.start_loop()
def start_loop(self):
def done(fn):
print('[info] future done')
def save_image(stream):
# getting an image from the stream
_, frame = stream['stream'].read()
# saving image to file system
cv.imwrite(os.path.join('result', stream['feed'], '{}.jpg'.format(time.time())))
while True:
try:
# getting a stream from the read_strategy
stream_object = self.stream_pool.next()
# Scheduling the process to the thread pool
self.thread_pool.submit(save_image, (stream_object)).add_done_callback(done)
except ValueError as e:
print('[error] {}'.format(e))
I didn't actually want to do anything after the future finished, but if I used result()
then the while True
would stop, which whould also defeat all the purpose of using the thread pool.
Side note: I had to add a threading.Rlock()
when calling self.stream_pool.next()
because apparently opencv can't handle calls from multiple threads.