I am having trouble with queues failing to work as intended. The goal of this design is so that the muncher
can take as little or as long as it needs, but it will always be able filled with work (input queue always has stuff, output queue always has room). I want to maximize muncher
's data crunching while the yoinker
and yeeter
take care of shuffling data to/away from it.
The gist of the test code is to yoink
some data (out of thin air currently), munch
on it, then yeet
it into the ether. Yoinking
fills an input queue, munching
uses the input queue and dumps into an output queue, and yeeting
pulls from the output queue.
Ideally it should all work, I was banking on the implicit waiting from queues during stall events (e.g., the yoinker
fills the input queue, its threads stop while the queue is full; yeeter
's output queue is empty, its threads stop while the output queue is filled back up).
That doesn't seem to happen reliably, which is problematic for using code reliably.
Without wait timers, the single-threaded yoinker
-caller fails to exit all muncher
/yeeter
threads. With wait timers on the munching
/yeeting
segments, the single-threaded yoinker
-caller usually gets through it.
With wait timers on the munching
/yeeting
segments, the parallel yoinker
-caller fails to even get to the exiting stage. With wait timers on the yoinking
segment as well as the munching
/yeeting
segments, it succeeds.
I can't control the speed at which the yoink
/munch
/yeet
segments will finish (sometimes it might be instantaneous, maybe multiple nearly instantaneous yoinks
in a row or something), is there a way to make this all be reliable with a parallel yoinker
-caller without any needed wait timers?
import numpy as np
import multiprocessing as mp
from time import sleep
#SOMETHING -> INPUT QUEUE
def yoinker(incrementer):
#--- make some data ---
data_in = np.ones((2000,2000)); #make some big data
data_in = data_in/np.sum(data_in)*incrementer; #tag it
#--- ditch the data ---
qb_in.put(data_in); #jam the data in
# sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
#END DEF
#INPUT QUEUE -> SOMETHING -> OUTPUT QUEUE
def muncher(qb_in, qb_out):
while True: #these are eternal functions that continually work on a queue that has no predefined end (to them)
#--- get the data out ---
data_in = qb_in.get(); #get the data out
if( data_in is None ):
break #this is how this gets to end
#END IF
#--- do something with the data ---
data_out = data_in.copy(); #so much
#--- ditch the data ---
qb_out.put(data_out); #jam the data in
# sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
#END WHILE
#END DEF
#OUTPUT QUEUE -> SOMETHING
def yeeter(qb_out):
while True: #these are eternal functions that continually work on a queue that has no predefined end (to them)
#--- get the data out ---
data_out = qb_out.get(); #get the data out
if( data_out is None ):
break #this is how this gets to end
#END IF
#--- save the data ---
# print('got data_out, sum is: '+str(np.round(np.sum(np.sum(data_out))))); #do some reporting
data_out = np.round(np.sum(np.sum(data_out)));
# sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
#END WHILE
#END DEF
def parallel_starmap_helper(fn, args): #for multiprocess starmap with kwargs, MUST be outside of the function that calls it or it just hangs
return fn(*args)
#END DEF
def initer(_qb_in): #basically each process will call this function when it starts (b/c it's defined as an "initializer")
global qb_in; #it lets each process know "qb_in" is a global variables (outside of the scope of the code, e.g., they'll appear w/o being defined)
qb_in = _qb_in; #link em up here
#END DEF
if __name__=='__main__':
#--- settings ---
threads_in = 2; #number of threads for the input process (reads files, fills input queue with resulting data)
threads_calc = 4; #number of threads for the calc process (reads input queue's resulting data, converts, fills output queue)
threads_out = 2; #number of threads for the output process (reads output queue's converted data, saves files)
queueSize_in = 5; # how many input files to hold (if emptied, stalls conversion)
queueSize_out = 5; #how many output files to hold (if filled, stalls conversion)
#--- define queues that hold input and output datas ---
qb_in = mp.Queue(maxsize=queueSize_in); # Queue to hold input things
qb_out = mp.Queue(maxsize=queueSize_out); # Queue to hold output things
#--- build data generator parallel lists (not using queues) ---
parallel_list = []; #Prep
for j in range(0, 25):
parallel_list.append([yoinker, [j]]); # pack up all needed function calls
#END FOR j
#--- build data cruncher lists (using queues) ---
munchers = [mp.Process(target=muncher,args=(qb_in, qb_out)) for i in range(threads_calc)] # this function gets the data from the input queue, processes it, and then puts in the output queue
yeeters = [mp.Process(target=yeeter,args=(qb_out, )) for i in range(threads_out)] # this function gets data processed and does the final steps
#--- start up all processes that are NOT blocking ---
for munch in munchers:
munch.daemon = True; #say it lives for others
munch.start(); #start each muncher up
#END FOR munch
for yeet in yeeters:
yeet.daemon = True; #say it lives for others
yeet.start(); #start each yeeter up
#END FOR yeet
for j in range(0, 25):
yoinker(j); # pack up all needed function
print('placed j'+str(j))
#END FOR j
# #--- call blocking data generator ---
# with mp.Pool(processes=threads_in, initializer=initer, initargs=(qb_in,)) as executor:
# executor.starmap(parallel_starmap_helper, parallel_list); #function you want is replaced with; parallel_starmap_kwarg_helper helps starmap distribute everything right
# #END WITH
for j in range(0, threads_calc):
qb_in.put(None); #tell all muncher threads to quit (they get out of the qb_in queue)
print('qb_in - Put a None')
#END FOR j
for j in range(0, threads_out):
qb_out.put(None); #tell all yeeter threads to quit (they get out of the qb_out queue)
print('qb_out - Put a None')
#END FOR j
#--- This portion lets you know if the code has hung ---
#it does this via checking the queues. The queues should end as exactly enough `None`s have been put in to end all of the queues started, but without timers they do not always end.
#This is here to give some feedback, since calling `.join()` on the processes will just sit there silently.
FLG_theyDone = False;
while( FLG_theyDone == False ):
FLG_theyDone = True;
print('\nStarting loop')
if( qb_in.full() == True ):
print('qb_in full');
elif( qb_in.empty() == True ):
print('qb_in empty');
#END IF
if( qb_out.full() == True ):
print('qb_out full');
elif( qb_out.empty() == True ):
print('qb_out empty');
#END IF
for munch in munchers:
print('munch - '+str(munch.exitcode))
if( munch.exitcode is None ):
FLG_theyDone = False;
# try:
# qb_in.put(sentinel, block=False); #tell all muncher threads to quit
# except:
# print('qb_in full, can\'t jam more Nones');
# #END TRYING
#END IF
#END FOR munch
# print('yeeters - '+str(yeeters))
for yeet in yeeters:
print('yeet - '+str(yeet.exitcode))
if( yeet.exitcode is None ):
FLG_theyDone = False;
# try:
# qb_out.put(sentinel, block=False); #tell all muncher threads to quit
# except:
# print('qb_outn full, can\'t jam more Nones');
# #END TRYING
#END IF
#END FOR yeet
sleep(np.random.random()+2); #nap time! wait between 0 and 1 seconds
#END IF
#--- set up a place for them to end ---
for munch in munchers:
munch.join(); #end each muncher
#END FOR munch
for yeet in yeeters:
yeet.join(); #end each yeeter
#END FOR yeet
#END IF
The problem is you posting None for all the munchers and yeeters at the same time. Make sure to the munchers have finished munching (join the threads), then post None for the yeeters and join them.
for j in range(25):
yoinker(j)
print(f'placed j{j}')
for j in range(threads_calc):
qb_in.put(None)
print('qb_in - Put a None')
for munch in munchers: # Move the join here.
munch.join()
for j in range(threads_out):
qb_out.put(None)
print('qb_out - Put a None')
for yeet in yeeters:
yeet.join()
While this solves the issue, I'm not sure why it hangs in the failing case. All of the threads do receive None
and the functions exit, but the joins still hang. They stopped hanging when I switched to more simple queue data without numpy
or even smaller numpy arrays like np.ones((2,2))
, but still processed incorrectly due to output threads terminating before processing threads were done.