Search code examples
pythonmultiprocessingpython-multiprocessing

Staggered data loading with multiprocessing.Queue sometimes leads to items being consumed out of order


I'm writing a script which animates image data. I have a number of large image cubes (3D arrays). For each of these, I step through the frames in each cube, and once I get near the end of it, I load the next cube and continue. Due to the large size of each cube, there is a significant load time (~5s). I'd like the animation to transition between cubes seamlessly (while also conserving memory), so I'm staggering the load processes. I've made some progress towards a solution, but some problems persist.

The code below loads each data cube, splits it into frames and puts these into a multiprocessing.Queue. Once the number of frames in the queue falls below a certain threshold, the next load process is triggered which loads another cube and unpacks it into the queue.

Check out the code below:

import numpy as np
import multiprocessing as mp
import logging
logger = mp.log_to_stderr(logging.INFO)
import time

def data_loader(event, queue, **kw):
    '''loads data from 3D image cube'''
    event.wait()        #wait for trigger before loading

    logger.info( 'Loading data' )
    time.sleep(3)                       #pretend to take long to load the data
    n = 100
    data = np.ones((n,20,20))*np.arange(n)[:,None,None]          #imaginary 3D image cube (increasing numbers so that we can track the data ordering)

    logger.info( 'Adding data to queue' )
    for d in data:
        queue.put(d)
    logger.info( 'Done adding to queue!' )


def queue_monitor(queue, triggers, threshold=50, interval=5):
    '''
    Triggers the load events once the number of data in the queue falls below 
    threshold, then doesn't trigger again until the interval has passed.  
    Note: interval should be larger than data load time.
    '''
    while len(triggers):
        if queue.qsize() < threshold:
            logger.info( 'Triggering next load' )
            triggers.pop(0).set()
            time.sleep(interval)    


if __name__ == '__main__':
    logger.info( "Starting" )
    out_queue = mp.Queue()

    #Initialise the load processes
    nprocs, procs = 3, []
    triggers = [mp.Event() for _ in range(nprocs)]
    triggers[0].set()           #set the first process to trigger immediately
    for i, trigger in enumerate(triggers):
        p = mp.Process( name='data_loader %d'%i, target=data_loader, 
                        args=(trigger, out_queue) )
        procs.append( p )
    for p in procs:
        p.start()

    #Monitoring process
    qm = mp.Process( name='queue_monitor', target=queue_monitor, 
                     args=(out_queue, triggers) )
    qm.start()

    #consume data
    while out_queue.empty():
        pass
    else:
        for d in iter( out_queue.get, None ):
            time.sleep(0.2)   #pretend to take some time to process/animate the data
            logger.info( 'data: %i' %d[0,0] )   #just to keep track of data ordering

This works brilliantly in some cases, but sometimes the order of the data gets jumbled after a new load process is triggered. I can't figure out why this should happen - mp.Queue is supposed to be FIFO right?! For eg. Running the code above as is won't preserve the correct order in the output queue, however, changing the threshold to a lower value eg. 30 fixes this. *so confused...

So question: How do I correctly implement this staggered loading strategy with multiprocessing in python?


Solution

  • This looks like a buffering problem. Internally, multiprocessing.Queue uses a buffer to temporarily store items you've enqueued, and eventually flushes them to a Pipe in a background thread. It's only after the flushing happening that the items are actually sent to other processes. Because you're putting large objects onto the Queue, there is a lot of buffering going on. This is causing the loading processes to actually overlap, even though your logging shows that one process is done before the other starts. The docs actually have a warning about this scenario:

    When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.

    1. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising Queue.Empty.
    2. If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.

    I would recommend doing as the docs state, and use a multiprocessing.Manager to create your queue:

    m = mp.Manager()
    out_queue = m.Queue()
    

    Which will let you avoid the issue altogether.

    Another option would be to use just one process to do all the data loading, and have it run in a loop, with the event.wait() call at the top of the loop.