Search code examples
pythonpython-multiprocessing

Python Multiprocessing Queue and Pool slower than normal loop


I am trying to implement multiprocessing in a Python program where I need to run some CPU intensive code. In my test code the multiprocessing Queue and the multiprocessing Pool are both slower than a normal loop with no multiprocessing. During the Pool section of my code, I can see that the CPU usage is maxed out. However, it is still slower than the normal loop! Is there an issue with my code?

import time
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Pool
import random


def run_sims(iterations):
    sim_list = []
    for i in range(iterations):
        sim_list.append(random.uniform(0,1))
    print(iterations, "count", sum(sim_list)/len(sim_list))
    return (sum(sim_list)/len(sim_list))

def worker(queue):
    i=0
    while not queue.empty():
        task = queue.get()
        run_sims(task)
        i=i+1

if __name__ == '__main__':    
    queue = Queue()
    iterations_list = [30000000, 30000000, 30000000, 30000000, 30000000]
    it_len = len(iterations_list)
    
    ## Queue ##
    print("#STARTING QUEUE#")
    start_t = time.perf_counter()
    for i in range(it_len):
        iterations = iterations_list[i]
        queue.put(iterations) 

    process = Process(target=worker, args=(queue, ))
    process.start()
    process.join() 
    end_t = time.perf_counter()
    print("Queue time: ", end_t - start_t)
    
    ## Pool ##
    print("#STARTING POOL#")
    start_t = time.perf_counter()
    with Pool() as pool:
        results = pool.imap_unordered(run_sims, iterations_list)

        for res in results:
            res
    end_t = time.perf_counter()
    print("Pool time: ", end_t - start_t)

    ## No Multiprocessing - Normal Loop
    print("#STARTING NORMAL LOOP#")
    start_t = time.perf_counter()
    for i in iterations_list:
        run_sims(i)
    end_t = time.perf_counter()
    print("Normal time: ", end_t - start_t)

I've tried the above code but the multiprocessing sections are slower than the normal loop:

Queue Time: 59 seconds

Pool Time: 83 seconds

Normal Loop Time: 55 seconds

My expectation is that Queue and Pool would be significantly faster than the normal loop.


Solution

  • Added processes to the queue code so that it will perform about the same as the pool. On my machine, queue and pool were significantly faster than sequential. I have 4 cores and 8 cpus. Since this is a cpu bound task, performance differences will differ depending on the number of available cpus and other working going on in the machine.

    This script keeps the number of workers below the cpu count. If these were network bound tasks, a larger pool could potentially perform faster. Disk bound tasks would likely not benefit from a larger pool.

    import time
    from multiprocessing import Process
    from multiprocessing import Queue
    from multiprocessing import Pool
    from multiprocessing import cpu_count 
    import random
    
    
    def run_sims(iterations):
        sim_list = []
        for i in range(iterations):
            sim_list.append(random.uniform(0,1))
        print(iterations, "count", sum(sim_list)/len(sim_list))
        return (sum(sim_list)/len(sim_list))
    
    def worker(queue):
        i=0
        while not queue.empty():
            task = queue.get()
            run_sims(task)
            i=i+1
    
    if __name__ == '__main__':
    
        iteration_count = 5
    
        queue = Queue()
        iterations_list = [30000000] * iteration_count
        it_len = len(iterations_list)
        
        # guess a parallel execution size. CPU bound, and we want some
        # room for other processes.
        pool_size = max(min(cpu_count()-2, len(iterations_list)), 2)
        print("Pool size", pool_size)
    
        ## Queue ##
        print("#STARTING QUEUE#")
        start_t = time.perf_counter()
        for iterations in iterations_list:
            queue.put(iterations) 
    
        processes = []
        for i in range(pool_size):
            processes.append(Process(target=worker, args=(queue, )))
            processes[-1].start()
        for process in processes:
            process.join() 
        end_t = time.perf_counter()
        print("Queue time: ", end_t - start_t)
    
        ## Pool ##
        print("#STARTING POOL#")
        start_t = time.perf_counter()
        with Pool(pool_size) as pool:
            results = pool.imap_unordered(run_sims, iterations_list)
    
            for res in results:
                res
        end_t = time.perf_counter()
        print("Pool time: ", end_t - start_t)
    
        ## No Multiprocessing - Normal Loop
        print("#STARTING NORMAL LOOP#")
        start_t = time.perf_counter()
        for i in iterations_list:
            run_sims(i)
        end_t = time.perf_counter()
        print("Normal time: ", end_t - start_t)