I have a queue with 100.000 elements, which I simply want to clean, one by one using multiple processes and then print the time it took in the end:
import multiprocessing
from datetime import datetime
def worker(queue):
while not queue.empty():
try:
item = queue.get_nowait()
# Process the item
print(f"Process {multiprocessing.current_process().name} processing item: {item}")
except:
break
def main():
# Create a queue with 100,000 elements using Manager
manager = multiprocessing.Manager()
queue = manager.Queue()
for i in range(100000):
queue.put(i)
# Starttime
start=datetime.now()
# Create 8 worker processes
processes = []
for _ in range(NR_WORKERS):
p = multiprocessing.Process(target=worker, args=(queue,))
processes.append(p)
p.start()
# Wait for all processes to finish
for p in processes:
p.join()
# Endtime
print(queue.empty(), "all processes finished in:", datetime.now()-start)
if __name__ == "__main__":
global NR_WORKERS
NR_WORKERS = 8
main()
However, when I execute the code with 8 processes, it is almost same fast as with 1 process (compared 15 to 18seconds) and without the print statement both variants are equally fast (around 4 seconds). How does it come, although I'm using more processes? Is there a way to improve this?
Your code as posted is not the best candidate for multiprocessing, which works best when your worker function (worker
in this case) is more CPU-intensive then what it currently is so that the overhead of using multiprocessing is more than offset by the parallelism you achieve.
But assuming your actual worker function is more CPU-intensive than what you posted, you can achieve a great performance boost by using a multiprocessing.Queue
instance instead of a managed queue instance. But if you do so, you do need to be aware that the call to queue.empty()
is no longer reliable. Therefore, your main process should put to the queue an additional NR_WORKERS
sentinel objects (one for each child process) that cannot be mistaken for actual data and signifies that there is no more data in the queue to be gotten. So when each child process gets one of these sentinel objects, it knows to terminate. In this case None
is a suitable sentinel value.
If you are going to time your run, then start the clock at the beginning of main
so that you also measure the non-trivial time it takes to put 100_000 integers to a managed queue. If you do so, the total running time becomes (on my desktop) 19.75 seconds. If using a multiprocessing.Queue
instance instead, then the running time is reduced to 2.1 secons:
import multiprocessing
from datetime import datetime
def worker(queue):
while True:
try:
item = queue.get_nowait()
if item is None: # Sentinel
break
# Process the item
#print(f"Process {multiprocessing.current_process().name} processing item: {item}")
except:
break
def main():
# Starttime
start = datetime.now()
# Create a queue with 100,000 elements:
queue = multiprocessing.Queue()
for i in range(100000):
queue.put(i)
# Add sentinel values:
for _ in range(NR_WORKERS):
queue.put(None)
# Create NR_WORKERS worker processes
processes = []
for _ in range(NR_WORKERS):
p = multiprocessing.Process(target=worker, args=(queue,))
processes.append(p)
p.start()
# Wait for all processes to finish
for p in processes:
p.join()
# Endtime
print("All processes finished in:", datetime.now() - start)
if __name__ == "__main__":
global NR_WORKERS
NR_WORKERS = 8
main()
If you need to have your worker function return a result, I would advise you instead that you should just use a multiprocessing pool (not a bad idea to do this even in your current example). But if you do continue to use explicit multiprocessing.Process
child processes, then you will need a second output queue for the results. But be aware that you have no control over the order in which the child processes will be putting their results to the result queue. So if you need to get the results back in submitted order, you will need to pass the index of each item being placed on the input queue and have the child process return that index along with the result. You must also get the results from the result queue before joining the child processes. If you know that each worker function will always successfully return a result, then you can just keep count of the number of results the main process retrieves. Otherwise, as in the example below, if the child process will terminate on an error, then it should always put a sentinel value to the result queue whenever it terminates:
import multiprocessing
from datetime import datetime
def worker(input_queue, output_queue):
while True:
try:
item = input_queue.get_nowait()
if item is None: # Sentinel
break
# Process the item
index, n = item # unpack
output_queue.put((index, n ** 2)) # return tuple of input index and result
except:
break
output_queue.put(None) # This process has put its final item
def main():
# Starttime
start = datetime.now()
input_queue = multiprocessing.Queue()
output_queue = multiprocessing.Queue()
arguments = [
1, 3, 5, 7, 2, 4, 6, 100, 1_000, 1_000_000
]
for index, n in enumerate(arguments):
input_queue.put((index, n))
# Add sentinel values:
for _ in range(NR_WORKERS):
input_queue.put(None)
# Create NR_WORKERS worker processes
processes = []
for _ in range(NR_WORKERS):
p = multiprocessing.Process(target=worker, args=(input_queue, output_queue))
processes.append(p)
p.start()
# Allocate list for results:
results = [None] * len(arguments)
sentinels_seen = 0
while sentinels_seen < NR_WORKERS:
result = output_queue.get()
if result is None: # sentinel?
sentinels_seen += 1
else:
index, return_value = result # Unpack tuple
results[index] = return_value
# Now we can wait for all processes to finish
for p in processes:
p.join()
# Endtime
print("All processes finished in:", datetime.now() - start)
print(results)
if __name__ == "__main__":
global NR_WORKERS
NR_WORKERS = 8
main()
Prints:
All processes finished in: 0:00:00.228998
[1, 9, 25, 49, 4, 16, 36, 10000, 1000000, 1000000000000]
Using a multiprocessing pool:
import multiprocessing
from datetime import datetime
def worker(n):
return n ** 2
def main():
# Starttime
start = datetime.now()
arguments = [
1, 3, 5, 7, 2, 4, 6, 100, 1_000, 1_000_000
]
with multiprocessing.Pool(NR_WORKERS) as pool:
results = pool.map(worker, arguments)
# Endtime
print("All processes finished in:", datetime.now() - start)
print(results)
if __name__ == "__main__":
global NR_WORKERS
NR_WORKERS = 8
main()
Prints:
All processes finished in: 0:00:00.226000
[1, 9, 25, 49, 4, 16, 36, 10000, 1000000, 1000000000000]