Search code examples
pythonpython-multiprocessingpoolworker

Python Multiprocessing with a queue as input for a camera


I have a camera that generates 20 numpy images per second and it will put the numpy images into a queue.

My job is to read from the queue and save the files as jpeg. If I do this in sequence, some files will be lost because saving one jpeg takes more than 50milliseconds so sooner or later the queue will be full.

Using multithreading does not help as they are still using one cpu core. I need to use multiprocessing to save the files using different CPU cores. Since the camera generates images continously and the buffer (elements in queue) changes all the time, the examples that I found in google do not match what I need, as the examples usually use a static array as input.

I tried and also read from stackoverflow that Pool does not support a queue as argument input as queue is not iterable.

I suspect that changing the camera code to make it save to an array of numpy images is possible, but still the issue is, if I use Pool.map, if the size of array changes all the time, it may not work.

Any suggestions on how can I do this task?

Thank you very much in advance!!


Solution

  • You can use JoinableQueue from the multiprocessing package

    import multiprocessing as mp
    
    def worker(queue):
      item = payloads_queue.get()
      # TODO: handle
      queue.task_done()
    
    
    queue = mp.JoinableQueue()
    # TODO: fill the queue with images to save
    

    Now just start to spawn processes mp.Process like:

    p = mp.Process(target=worker,
                args=(queue,))
    
    p.start()