Search code examples
pythonparallel-processingqueuemultiprocessingpool

Pool, queue, hang


I want to use a queue to hold result because I want a consumer (serial not parallel) to process the result of the workers as the workers produce the result.

For now, I want to know why the following program hangs.

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg 
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()

for i in range(4):
    x[0] = i 
    #worker((q,x))
    p.apply_async(worker, args=((q, x),)) 

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())

Solution

  • Queue objects cannot be shared. I came to the same conclusion as the OP first by finding this answer.

    Unfortunately, there were other problems in this code (which doesn't make it an exact duplicate of the linked answer)

    • worker(arg) should be worker(*arg) for the args unpacking to work. Without that, my process locked up too (I admit I don't know why. It should have thrown an exception, but I guess that multiprocessing & exceptions don't work well together)
    • passing the same x to the workers result in same number as a result (with apply it works, but not with apply_async

    Another thing: for the code to be portable, wrap the main code by if __name__ == "__main__":, required on Windows because of differences in process spawning

    Fully fixed code that outputs 0,3,2,1 for me:

    import multiprocessing as mp
    import time
    import numpy as np
    def worker(*arg):  # there are 2 arguments to "worker"
    #def worker(q, arr):  # is probably even better
        time.sleep(0.2)
        q, arr = arg
        q.put(arr[0])
    
    if __name__ == "__main__":
        p = mp.Pool(4)
    
        m = mp.Manager()  # use a manager, Queue objects cannot be shared
        q = m.Queue()
    
        for i in range(4):
            x = np.array([4,4])  # create array each time (or make a copy)
            x[0] = i
            p.apply_async(worker, args=(q, x))
    
        print("done_apply")
        time.sleep(0.2)
        for i in range(4):
            print(q.get())