Search code examples
pythonmultiprocessingqueue

Python multiprocessing queues hanging without wait timers


I am having trouble with queues failing to work as intended. The goal of this design is so that the muncher can take as little or as long as it needs, but it will always be able filled with work (input queue always has stuff, output queue always has room). I want to maximize muncher's data crunching while the yoinker and yeeter take care of shuffling data to/away from it.

The gist of the test code is to yoink some data (out of thin air currently), munch on it, then yeet it into the ether. Yoinking fills an input queue, munching uses the input queue and dumps into an output queue, and yeeting pulls from the output queue.

Ideally it should all work, I was banking on the implicit waiting from queues during stall events (e.g., the yoinker fills the input queue, its threads stop while the queue is full; yeeter's output queue is empty, its threads stop while the output queue is filled back up).

That doesn't seem to happen reliably, which is problematic for using code reliably.

Without wait timers, the single-threaded yoinker-caller fails to exit all muncher/yeeter threads. With wait timers on the munching/yeeting segments, the single-threaded yoinker-caller usually gets through it.

With wait timers on the munching/yeeting segments, the parallel yoinker-caller fails to even get to the exiting stage. With wait timers on the yoinking segment as well as the munching/yeeting segments, it succeeds.

I can't control the speed at which the yoink/munch/yeet segments will finish (sometimes it might be instantaneous, maybe multiple nearly instantaneous yoinks in a row or something), is there a way to make this all be reliable with a parallel yoinker-caller without any needed wait timers?

import numpy as np
import multiprocessing as mp
from time import sleep


#SOMETHING -> INPUT QUEUE
def yoinker(incrementer):
    #--- make some data ---
    data_in = np.ones((2000,2000)); #make some big data
    data_in = data_in/np.sum(data_in)*incrementer; #tag it
    
    #--- ditch the data ---
    qb_in.put(data_in); #jam the data in
    
    # sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
#END DEF

#INPUT QUEUE -> SOMETHING -> OUTPUT QUEUE
def muncher(qb_in, qb_out):
    while True: #these are eternal functions that continually work on a queue that has no predefined end (to them)
        #--- get the data out ---
        data_in = qb_in.get(); #get the data out
        if( data_in is None ):
            break #this is how this gets to end
        #END IF
        
        #--- do something with the data ---
        data_out = data_in.copy(); #so much
        
        #--- ditch the data ---
        qb_out.put(data_out); #jam the data in
        
        # sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
        
    #END WHILE
#END DEF

#OUTPUT QUEUE -> SOMETHING
def yeeter(qb_out):
    while True: #these are eternal functions that continually work on a queue that has no predefined end (to them)
        #--- get the data out ---
        data_out = qb_out.get(); #get the data out
        if( data_out is None ):
            break #this is how this gets to end
        #END IF
        
        #--- save the data ---
        # print('got data_out, sum is: '+str(np.round(np.sum(np.sum(data_out))))); #do some reporting
        data_out = np.round(np.sum(np.sum(data_out)));
        
        # sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
    #END WHILE
#END DEF

def parallel_starmap_helper(fn, args): #for multiprocess starmap with kwargs, MUST be outside of the function that calls it or it just hangs
    return fn(*args)
#END DEF

def initer(_qb_in): #basically each process will call this function when it starts (b/c it's defined as an "initializer")
    global qb_in; #it lets each process know "qb_in" is a global variables (outside of the scope of the code, e.g., they'll appear w/o being defined)
    qb_in = _qb_in; #link em up here
#END DEF

if __name__=='__main__':
    #--- settings ---
    threads_in = 2; #number of threads for the input process (reads files, fills input queue with resulting data)
    threads_calc = 4; #number of threads for the calc process (reads input queue's resulting data, converts, fills output queue)
    threads_out = 2; #number of threads for the output process (reads output queue's converted data, saves files)
    queueSize_in = 5; # how many input files to hold (if emptied, stalls conversion)
    queueSize_out = 5; #how many output files to hold (if filled, stalls conversion)
    
    #--- define queues that hold input and output datas ---
    qb_in = mp.Queue(maxsize=queueSize_in); # Queue to hold input things
    qb_out = mp.Queue(maxsize=queueSize_out); # Queue to hold output things
    
    #--- build data generator parallel lists (not using queues) ---
    parallel_list = []; #Prep
    for j in range(0, 25):
        parallel_list.append([yoinker, [j]]); # pack up all needed function calls
    #END FOR j
    
    #--- build data cruncher lists (using queues) ---
    munchers = [mp.Process(target=muncher,args=(qb_in, qb_out)) for i in range(threads_calc)] # this function gets the data from the input queue, processes it, and then puts in the output queue
    yeeters = [mp.Process(target=yeeter,args=(qb_out, )) for i in range(threads_out)] # this function gets data processed and does the final steps
    
    #--- start up all processes that are NOT blocking ---
    for munch in munchers:
        munch.daemon = True; #say it lives for others
        munch.start(); #start each muncher up
    #END FOR munch
    for yeet in yeeters:
        yeet.daemon = True; #say it lives for others
        yeet.start(); #start each yeeter up
    #END FOR yeet
        
    for j in range(0, 25):
        yoinker(j); # pack up all needed function 
        print('placed j'+str(j))
    #END FOR j
    # #--- call blocking data generator ---
    # with mp.Pool(processes=threads_in, initializer=initer, initargs=(qb_in,)) as executor:
    #     executor.starmap(parallel_starmap_helper, parallel_list); #function you want is replaced with; parallel_starmap_kwarg_helper helps starmap distribute everything right
    # #END WITH
    for j in range(0, threads_calc):
        qb_in.put(None); #tell all muncher threads to quit (they get out of the qb_in queue)
        print('qb_in - Put a None')
    #END FOR j
    for j in range(0, threads_out):
        qb_out.put(None); #tell all yeeter threads to quit (they get out of the qb_out queue)
        print('qb_out - Put a None')
    #END FOR j

    #--- This portion lets you know if the code has hung ---
    #it does this via checking the queues. The queues should end as exactly enough `None`s have been put in to end all of the queues started, but without timers they do not always end.
    #This is here to give some feedback, since calling `.join()` on the processes will just sit there silently.
    FLG_theyDone = False;
    while( FLG_theyDone == False ):
        FLG_theyDone = True;
        print('\nStarting loop')
        if( qb_in.full() == True ):
          print('qb_in full');
        elif( qb_in.empty() == True ):
            print('qb_in empty');
        #END IF
        if( qb_out.full() == True ):
          print('qb_out full');
        elif( qb_out.empty() == True ):
            print('qb_out empty');
        #END IF
        
        for munch in munchers:
            print('munch - '+str(munch.exitcode))
            if( munch.exitcode is None ):
                FLG_theyDone = False;
                # try:
                #     qb_in.put(sentinel, block=False); #tell all muncher threads to quit
                # except:
                #     print('qb_in full, can\'t jam more Nones');
                # #END TRYING
            #END IF
        #END FOR munch
        # print('yeeters - '+str(yeeters))
        for yeet in yeeters:
            print('yeet - '+str(yeet.exitcode))
            if( yeet.exitcode is None ):
                FLG_theyDone = False;
                # try:
                #     qb_out.put(sentinel, block=False); #tell all muncher threads to quit
                # except:
                #     print('qb_outn full, can\'t jam more Nones');
                # #END TRYING
            #END IF
        #END FOR yeet
        
        sleep(np.random.random()+2); #nap time! wait between 0 and 1 seconds
    #END IF
    
    #--- set up a place for them to end ---
    for munch in munchers:
        munch.join(); #end each muncher
    #END FOR munch
    for yeet in yeeters:
        yeet.join(); #end each yeeter
    #END FOR yeet
#END IF

Solution

  • The problem is you posting None for all the munchers and yeeters at the same time. Make sure to the munchers have finished munching (join the threads), then post None for the yeeters and join them.

        for j in range(25):
            yoinker(j)
            print(f'placed j{j}')
    
        for j in range(threads_calc):
            qb_in.put(None)
            print('qb_in - Put a None')
    
        for munch in munchers:  # Move the join here.
            munch.join()
    
        for j in range(threads_out):
            qb_out.put(None)
            print('qb_out - Put a None')
    
        for yeet in yeeters:
            yeet.join()
    

    While this solves the issue, I'm not sure why it hangs in the failing case. All of the threads do receive None and the functions exit, but the joins still hang. They stopped hanging when I switched to more simple queue data without numpy or even smaller numpy arrays like np.ones((2,2)), but still processed incorrectly due to output threads terminating before processing threads were done.