Search code examples
pythonmultiprocessingpython-multiprocessing

Returning results is very slow in multiprocess code. What should I do?


No multiprocessing code:

from time import time

func1Results = []

def func1(valList):
    num = 0

    for val in valList:
        num += val

    func1Results.append(num)

if __name__ == '__main__':
    st = time()
    
    for valList in [range(40000000), range(40000000), range(40000000), range(40000000)]:
        func1(valList)

    ed = time()
    
    for r in func1Results:
        print(r)

    print(ed - st)

Output:
799999980000000
799999980000000
799999980000000
799999980000000
13.679119348526001

Multiprocess code:

from multiprocessing import Process, Queue
from time import time

queue = Queue()
processList, func1Results = [], []

def func1(valList, queue):
    num = 0

    for val in valList:
        num += val

    queue.put(num)

if __name__ == '__main__':
    st = time()

    for valList in [range(40000000), range(40000000), range(40000000), range(40000000)]:
        xProcess = Process(target=func1, args=(valList, queue))
        xProcess.start()
        
        func1Results.append(queue.get()), processList.append(xProcess)

    for xProcess in processList:
        xProcess.join()

    ed = time()

    for i in func1Results:
        print(i)

    print(ed - st)

Output:
799999980000000
799999980000000
799999980000000
799999980000000
13.916456937789917

When I use the 'Put' and 'Get' commands, the processing time of the multiprocessing code increases significantly. I know that returning results in multiprocessing is quite time consuming. But this is exactly what I need. What can I do to return the result more efficiently?


Solution

  • Here's a restructured approach to the original code where we allow all the sub-processes to terminate before we examine the queue.

    from concurrent.futures import ProcessPoolExecutor
    from multiprocessing import Manager
    from functools import partial
    import time
    
    N = 40000000
    
    def calc(q, rng):
        num = 0
        for n in rng:
            num += n
        q.put(num)
    
    def main():
        with Manager() as manager:
            queue = manager.Queue()
            rlist = [range(N), range(N), range(N), range(N)]
            p = partial(calc, queue)
            with ProcessPoolExecutor() as executor:
                executor.map(p, rlist)
            while not queue.empty():
                print(queue.get())
    
    if __name__ == '__main__':
        start = time.perf_counter()
        main()
        end = time.perf_counter()
        print(f'Duration = {end-start:.2f}s')
    

    Output:

    799999980000000
    799999980000000
    799999980000000
    799999980000000
    Duration = 1.93s
    

    Note:

    Of course, you don't need a queue to get the results from the sub-process. You could just do this:

    from concurrent.futures import ProcessPoolExecutor
    import time
    
    N = 40000000
    
    def calc(rng):
        num = 0
        for n in rng:
            num += n
        return num
    
    def main():
        rlist = [range(N), range(N), range(N), range(N)]
        with ProcessPoolExecutor() as executor:
            print(*executor.map(calc, rlist), sep='\n')
        
    if __name__ == '__main__':
        start = time.perf_counter()
        main()
        end = time.perf_counter()
        print(f'Duration = {end-start:.2f}s')
    

    Output:

    799999980000000
    799999980000000
    799999980000000
    799999980000000
    Duration = 1.83s