I want to use a queue to hold result because I want a consumer (serial not parallel) to process the result of the workers as the workers produce the result.
For now, I want to know why the following program hangs.
import multiprocessing as mp
import time
import numpy as np
def worker(arg):
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()
for i in range(4):
x[0] = i
#worker((q,x))
p.apply_async(worker, args=((q, x),))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())
Queue
objects cannot be shared. I came to the same conclusion as the OP first by finding this answer.
Unfortunately, there were other problems in this code (which doesn't make it an exact duplicate of the linked answer)
worker(arg)
should be worker(*arg)
for the args unpacking to work. Without that, my process locked up too (I admit I don't know why. It should have thrown an exception, but I guess that multiprocessing & exceptions don't work well together)x
to the workers result in same number as a result (with apply
it works, but not with apply_async
Another thing: for the code to be portable, wrap the main code by if __name__ == "__main__":
, required on Windows because of differences in process spawning
Fully fixed code that outputs 0,3,2,1 for me:
import multiprocessing as mp
import time
import numpy as np
def worker(*arg): # there are 2 arguments to "worker"
#def worker(q, arr): # is probably even better
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
if __name__ == "__main__":
p = mp.Pool(4)
m = mp.Manager() # use a manager, Queue objects cannot be shared
q = m.Queue()
for i in range(4):
x = np.array([4,4]) # create array each time (or make a copy)
x[0] = i
p.apply_async(worker, args=(q, x))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())