Search code examples
pythonmultiprocessingqueue

return data to the main program from a queue that I read in separate processes


I am computing data using multiprocessing capabilities. The "worker" script puts its results in a queue, that I then easily read in the main program with:

result=[]
while not Q.empty():
    result.append(Q.get())

But if I have many many data in the queue, I thought I might as well use one or two cores of my cpu to start reading the queue while the worker processes keep working. I found a piece of code here: How to use multiprocessing queue in Python? that creates "reader" processes, reading the queue continuously until the worker tell them that the computation is over. That works fine, but the reader only reads the queue and returns nothing. How do I actually get the data from the queue in order to use it in my main program? The only solution I found is to create a list with a multiprocess.Manager(), and pass it as an argument to the reader. That works, but takes ages!! It completely kills the execution time of my program, so the first method (reading the queue directly in the main program) is much better. Do I have other solution? Is it actually possible to get in the main program the data from a queue that I read in separate processes?

Below, an example code, built from various pieces of code gathered here and there:

import multiprocessing as mp
from datetime import datetime


def worker(numbers, start, end, qu):
    """A worker function to calculate squares of numbers."""
    res=[]
    for i in range(start, end):
        res.append(numbers[i] * numbers[i])
    qu.put(res)

def reader(q, outputlist):
    """Read from the queue; this spawns as a separate Process"""
    #returnedlist=[]
    while True:
        msg = q.get()  # Read from the queue and do nothing
        if msg == "DONE":
            break
        [outputlist.append(x) for x in msg] # comment out if using 1st method (reading queue in the main program)
    return

def start_reader_procs(q, num_of_reader_procs, L):
    """Start the reader processes and return all in a list to the caller
    source: 
    https://stackoverflow.com/questions/11515944/how-to-use-multiprocessing-queue-in-python"""
    
    all_reader_procs = list()
    for ii in range(0, num_of_reader_procs):
        ### reader_p() reads from qq as a separate process...
        ###    you can spawn as many reader_p() as you like
        ###    however, there is usually a point of diminishing returns
        reader_p = mp.Process(target=reader, args=((q),L,))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_p() as another proc
        
        all_reader_procs.append(reader_p)

    return all_reader_procs 
    
def main(core_count):
    numbers = range(50000)  # A larger range for a more evident effect of multiprocessing
    segment = len(numbers) // core_count
    processes = []
    #Q = mp.Queue()
    m = mp.Manager()
    Q = m.Queue()
    
    # comment out if using 1st method (reading queue in the main program)
    #----------------------------------
    num_of_reader_procs=2
    L = m.list()
    all_reader_procs = start_reader_procs(Q, num_of_reader_procs, L)
    #-----------------------------------------
    
    for i in range(core_count):
        start = i * segment
        if i == core_count - 1:
            end = len(numbers)  # Ensure the last segment goes up to the end
        else:
            end = start + segment
        # Creating a process for each segment
        p = mp.Process(target=worker, args=(numbers, start, end, Q))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("All worker processes terminated")

    # comment out if using 1st method (reading queue in the main program)
    #----------------------------------
    ### Tell all readers to stop...
    for ii in range(0, num_of_reader_procs):
        Q.put("DONE")
    for idx, a_reader_proc in enumerate(all_reader_procs):
        print("    Waiting for reader_p.join() index %s" % idx)
        a_reader_proc.join()  # Wait for a_reader_proc() to finish
        print("        reader_p() idx:%s is done" % idx)    
    result=list(L)
    #----------------------------------
    
#   result=[]
#   while not Q.empty():
#       result.append(Q.get())
#   result = [x for L in result for x in L] # flatten the list of lists
    
    return result

if __name__ == '__main__':
    
    for core_count in [1, 2, 4]:
        
        starttime = datetime.now()
        print(f"Using {core_count} core(s):")
        result = main(core_count)
        print(f"First 10 squares: {list(result)[:10]}")  # Display the first 10 results as a sample
        endtime = datetime.now()
        print ("Total computation time : {:.1f} sec".format((endtime-starttime).total_seconds()))
        print()

Solution

  • The issue here is how best to make the queue available to the subprocesses.

    One technique is to use a multiprocessing Pool and an initialiser that will place the queue in the subprocess's global space.

    Here's an example which you should be able to adapt to your needs:

    from multiprocessing import Pool, Queue
    
    def worker(a):
        global q
        lst = [i * i for i in range(*a)]
        q.put(lst)
    
    def init(_q):
        global q
        q = _q
    
    def main():
        q = Queue()
        args = [(i, i + 10) for i in range(0, 100, 10)]
        with Pool(initializer=init, initargs=(q,)) as pool:
            pool.map(worker, args)
            pool.close()
            pool.join()
        while not q.empty():
            print(q.get())
    
    if __name__ == "__main__":
        main()
    

    Output:

    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    [100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
    [400, 441, 484, 529, 576, 625, 676, 729, 784, 841]
    [900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
    [1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401]
    [2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481]
    [3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761]
    [4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241]
    [6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921]
    [8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]