Search code examples
pythonpython-3.xmultiprocessingpool

Multiprocessing Pool.apply executing n-1 times


Im having an issues with multiprocessing.Pool.apply.
My objective is to have 5 processes, each filling an array with 100 elements (100 for this test), and then merging then arrays into a single one with length 500. Problem is, it ends up with only 400 elements for any reason i cant understand.

I have tried changing the amount of processes created by the pool but that didn't change anything at all besides the execution time.

import torch.multiprocessing as mp
import itertools

pool = mp.Pool(processes=5)
split = int(500/5)
lst =  pool.apply(RampedGraph, (split,[]))    #each foo returns a list of 100 elements
lst = list(itertools.chain.from_iterable(lst)) #merging the lists into one

len(lst)
>>>400

The expect output of len(lst) should be 500.
Can anyone enlighten me on what Im doing wrong?

EDIT Foo method explained:

def RampedGraph(popsize, graph_lst):
    cyclic_size = int(math.ceil(popsize/2))
    acyclic_size = popsize - full_size
    append = graph_lst.append
    for _ in range(cyclic_size):
        t = c.Node().cyclic()
        nn = c.number_of_nodes()
        c = c.calculate(0, False)
        append((t,nn,c))
    for _ in range(acyclic_size):
        t = c.Node().acyclic()
        nn = c.number_of_nodes()
        c = c.calculate(0, False)
        append((t,nn,c))
    return graph_lst

Solution

  • import torch.multiprocessing as mp
    # import multiprocessing as mp
    import itertools
    
    def RampedGraph(popsize, graph_lst):
        print(mp.current_process().name)
        return list(range(100))
    
    num_workers = 5
    pool = mp.Pool(processes=num_workers)
    split = int(500/num_workers)
    lst =  pool.starmap(RampedGraph, [(split,[])]*num_workers)
    lst = list(itertools.chain.from_iterable(lst)) 
    print(len(lst))
    # 500
    

    pool.starmap(RampedGraph, [(split,[])]*5) sends 5 tasks to the task pool. It causes RampedGraph(split, []) to be called 5 times concurrently. The 5 results returned by RampedGraph are collected into a list, lst.

    Note that calling RampedGraph 5 times concurrently does not guarantee that all 5 processors are used. For example, if RampedGraph were to finish very quickly, it is possible that one processor handles more than one task, and perhaps another processor never gets used at all. However, if RampedGraph takes a non-trivial amount of time, in general you can expect all 5 worker processes to be used.

    Note: I ran the above code with import multiprocessing as mp rather than import torch.multiprocessing as mp. But since torch.multiprocessing is supposed to be a drop-in replacement for multiprocessing, that shouldn't make a difference.


    Using multiprocessing comes with both costs and benefits. The benefit, of course, is the ability to use multiple processors concurrently. The costs include the time required to launch additional processes, and the cost of interprocess communication. multiprocessing uses Queues to transport arguments to the function run by the worker processes, and to transport the returned value back to the main process. To transport the returned values through the Queues, the objects are serialized into bytes via pickling. If the pickled objects being sent through the Queues are large, this can add a significant overhead cost when using multiprocessing. Notice that all these costs are not incurred by an equivalent sequential version of the same code.

    Particularly when the function run by the worker processes finishes quickly, overhead costs can dominate the total run time of the program, making code which uses multiprocessing slower than a sequential version of the same code.

    Thus, a key to speed when using multiprocessing is to try to minimize interprocess communication and to make sure the worker processes do a lot of work so the overhead costs become a relatively small part of the total run time.