Im having an issues with multiprocessing.Pool.apply
.
My objective is to have 5 processes, each filling an array with 100 elements (100 for this test), and then merging then arrays into a single one with length 500. Problem is, it ends up with only 400 elements for any reason i cant understand.
I have tried changing the amount of processes created by the pool but that didn't change anything at all besides the execution time.
import torch.multiprocessing as mp
import itertools
pool = mp.Pool(processes=5)
split = int(500/5)
lst = pool.apply(RampedGraph, (split,[])) #each foo returns a list of 100 elements
lst = list(itertools.chain.from_iterable(lst)) #merging the lists into one
len(lst)
>>>400
The expect output of len(lst)
should be 500
.
Can anyone enlighten me on what Im doing wrong?
EDIT Foo method explained:
def RampedGraph(popsize, graph_lst):
cyclic_size = int(math.ceil(popsize/2))
acyclic_size = popsize - full_size
append = graph_lst.append
for _ in range(cyclic_size):
t = c.Node().cyclic()
nn = c.number_of_nodes()
c = c.calculate(0, False)
append((t,nn,c))
for _ in range(acyclic_size):
t = c.Node().acyclic()
nn = c.number_of_nodes()
c = c.calculate(0, False)
append((t,nn,c))
return graph_lst
import torch.multiprocessing as mp
# import multiprocessing as mp
import itertools
def RampedGraph(popsize, graph_lst):
print(mp.current_process().name)
return list(range(100))
num_workers = 5
pool = mp.Pool(processes=num_workers)
split = int(500/num_workers)
lst = pool.starmap(RampedGraph, [(split,[])]*num_workers)
lst = list(itertools.chain.from_iterable(lst))
print(len(lst))
# 500
pool.starmap(RampedGraph, [(split,[])]*5)
sends 5 tasks to the task pool.
It causes RampedGraph(split, [])
to be called 5 times concurrently.
The 5 results returned by RampedGraph
are collected into a list, lst
.
Note that calling RampedGraph
5 times concurrently does not guarantee that all 5 processors are used. For example, if RampedGraph
were to finish very quickly, it is possible that one processor handles more than one task, and perhaps another processor never gets used at all.
However, if RampedGraph
takes a non-trivial amount of time, in general you can expect all 5 worker processes to be used.
Note: I ran the above code with import multiprocessing as mp
rather than import torch.multiprocessing as mp
. But since torch.multiprocessing
is supposed to be a drop-in replacement for multiprocessing
, that shouldn't make a difference.
Using multiprocessing
comes with both costs and benefits.
The benefit, of course, is the ability to use multiple processors concurrently.
The costs include the time required to launch additional processes, and the cost of interprocess communication. multiprocessing
uses Queues
to transport arguments to the function run by the worker processes, and to transport the returned value back to the main process. To transport the returned values through the Queues, the objects are serialized into bytes via pickling. If the pickled objects being sent through the Queues are large, this can add a significant overhead cost when using multiprocessing. Notice that all these costs are not incurred by an equivalent sequential version of the same code.
Particularly when the function run by the worker processes finishes quickly, overhead costs can dominate the total run time of the program, making code which uses multiprocessing slower than a sequential version of the same code.
Thus, a key to speed when using multiprocessing is to try to minimize interprocess communication and to make sure the worker processes do a lot of work so the overhead costs become a relatively small part of the total run time.