Search code examples
pythoncachingceleryshared-memoryvideo-processing

Celery task setup with memory cache for video frames as circular buffer strategy in python


I want to build a multi task processing pipeline on Celery and want several tasks processing the same video file. The tasks need to share the video data. So not every task has to decode and extract frames from the video file. Video data will be a list of extracted frames (not every frame of the video is needed).

Is there any solution to efficiently share those frames? Tasks can be processed on diffrent nodes. But i dont want to share the data over the network like Memcached or Redis. A task should look up for video data in memory/cache, if its not there the task should issue another task to load video and extract frames to cache.

(producer and multiple consumers for each video file)

So tasks on the same node/machine are able to share cached data. Two tasks on diffrent nodes have no benefits by caching.

I dont want to cache the entire extracted video, there has to be some circular buffer caching. Cache per video has a fixed size, lets say 100 frames. Gap between fastest and slowest task can not exceed 100 frames. Only a total of 100 frames are in memory/cache.

Two main question arises:

  1. Task setup

    Task A: extracting frames from video (producer to memory/cache)

    Task B1: consuming frames and doing actual work (processing frames)

    . .

    Task Bn: consuming frames and doing actual work (processing frames)

    A, B1 - Bn running in parallel. But then these task have to run on the same node. If B taks are distrubuted on diffrent nodes something has to spawn another A task (one on each node to decode and extract frames). What approach do you recommend here? What would be the best choice?

  2. Python cache

    Are there any cache libraries/implementations/solutions that fits best for my use case to cache large data on a local machine with some circular buffer implementation? Something like DiskCache but with the ability to only cache 100 frames by ringbuffering it.

What approaches and designs do you reccomend to implement my use case? I would like to stick to Celery for task distribution.


Solution

  • This may be my stubbornness showing, but I've always found projects like celery that add a bunch of complexity on top of multiprocessing (which is already complex) to be more trouble than they're worth. There's also no better alternative imo to using stdlib shared memory and mutexes from a speed and simplicity standpoint.

    For your case an easy solution would be to just use a fifo queue for each process and put the frame in each one from the producer. This would naturally produce a lot of memory usage if you were making n copies of each frame for n consumers, however you could probably quite easily come up with a mechanism to put the frames themselves into a multiprocessing.sharedctypes.Array and pass only the indexes through the queue instead. As long as the queues are restricted in length shorter than the length of the buffer, you should be limited from overwriting a frame in the buffer until it's been consumed by all the consumers. Without any synchronization, this would be flying by the seat of your pants, but a little bit of mutex magic could definitely make this a very robust solution.

    for example:

    import numpy as np
    from time import sleep
    from multiprocessing import Process, freeze_support, Queue
    from multiprocessing.sharedctypes import Array
    from ctypes import c_uint8
    from functools import reduce
    
    BUFSHAPE = (10,10,10) #10 10x10 images in buffer
    
    class Worker(Process):
        def __init__(self, q_size, buffer, name=''):
            super().__init__()
            self.queue = Queue(q_size)
            self.buffer = buffer
            self.name = name
    
        def run(self,): #do work here
            #I hardcoded datatype here. you might need to communicate it to the child process
            buf_arr = np.frombuffer(self.buffer.get_obj(), dtype=c_uint8)
            buf_arr.shape = BUFSHAPE
            while True:
                item = self.queue.get()
                if item == 'done':
                    print('child process: {} completed all frames'.format(self.name))
                    return
                with self.buffer.get_lock(): #prevent writing while we're reading
                    #slice the frame from the array uning the index that was sent
                    frame = buf_arr[item%BUFSHAPE[0]] #depending on your use, you may want to make a copy here
                #do some intense processing on `frame`
                sleep(np.random.rand())
                print('child process: {} completed frame: {}'.format(self.name, item))
    
    def main():
        #creating shared array
        buffer = Array(c_uint8, reduce(lambda a,b: a*b, BUFSHAPE))
        #make a numpy.array using that memory location to make it easy to stuff data into it
        buf_arr = np.frombuffer(buffer.get_obj(), dtype=c_uint8)
        buf_arr.shape = BUFSHAPE
        #create a list of workers
        workers = [Worker(BUFSHAPE[0]-2, #smaller queue than buffer to prevent overwriting frames not yet consumed
                          buffer, #pass in shared buffer array
                          str(i)) #numbered child processes
                          for i in range(5)] #5 workers
    
        for worker in workers: #start the workers
            worker.start()
        for i in range(100): #generate 100 random frames to send to workers
            #insert a frame into the buffer
            with buffer.get_lock(): #prevent reading while we're writing
                buf_arr[i%BUFSHAPE[0]] = np.random.randint(0,255, size=(10,10), dtype=c_uint8)
            #send the frame number to each worker for processing. If the input queue is full, this will block until there's space
            # this is what prevents `buf_arr[i%BUFSHAPE[0]] = np...` from overwriting a frame that hasn't been processed yet
            for worker in workers:
                worker.queue.put(i)
        #when we're done send the 'done' signal so the child processes exit gracefully (or you could make them daemons)
        for worker in workers:
            worker.queue.put('done')
            worker.join()
    
    
    if __name__ == "__main__":
        freeze_support()
        main()
    

    EDIT

    Some sort of off-by-one error requires the queue to be 2 frames smaller than the buffer rather than 1 frame smaller in order to prevent overwriting a frame before its time.

    EDIT2 - explaination of first edit:

    The reason for len(q) = len(buf)-2 appears to be that q.get() is called before we acquire the frame from the buffer, and the frame itself is written before we try to push the index to the queue. If the difference in length is only 1, the worker might pull a frame index from the queue, then the producer might see that it can push to the queue now and move on to the next frame before the worker has a chance to read the frame itself. There are many ways you could approach this differently that might allow fewer processes waiting on each other all the time, perhaps using mp.Event.