I have a program similar to the following:
import time
from multiprocessing import Pool
class a_system():
def __init__(self,N):
self.N = N
self.L = [0 for _ in range(self.N)]
def comp(self,n):
self.L[n] = 1
return self.L
def reset(self):
self.L = [0 for _ in range(self.N)]
def individual_sim(iter):
global B, L, sys
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B
def simulate(N_mc):
global B, L, sys
L = [[] for _ in range(N_mc)]
B = 0
sys = a_system(N_mc)
[*map(individual_sim, range(N_mc))]
# with Pool() as P:
# P.map(individual_sim,range(N_mc))
return L, B
if __name__=="__main__":
start = time.time()
L, B = simulate(N_mc=5)
print(L)
print(B)
print("Time elapsed: ",time.time()-start)
Here I would like to parallelise the line [*map(individual_sim, range(N_mc))]
with multiprocessing. However, replacing this line with
with Pool() as P:
P.map(individual_sim,range(N_mc))
returns an empty list of lists.
If instead I use P.map_async
, P.imap
, or P.imap_unordered
, I don't get an error, but the list and B
are left empty.
How can I parallelise this code?
P.S.
I have tried ThreadPool
from multiprocessing.pool
, but I would like to avoid that, because the class a_system
, which is a bit more complicated that the one shown here, needs to have a different copy for each worker (I get an exit code 139 (interrupted by signal 11: SIGSEGV)
).
P.S.2 I might try to use sharedctypes or Managers (?), but I'm not sure how they work, nor which one should I use (or a combination?).
P.S.3
I have also tried modifying individual_sim
as
def individual_sim(iter,B,L,sys):
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B
and to use the following in simulation
:
from functools import partial
part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
with Pool() as P:
P.map(part_individual_sim,range(N_mc))
But I still get empty lists.
It's not really clear to me what your business logic is here, but you cannot modify globals in your parent from within your child processes. Separate processes don't share their address space.
You could make L
a Manager.List
and B
a Manager.Value
to modify them from your worker processes, though. Manager-objects live in a separate server process and you can modify them with proxy objects. Further you would need to use a Manager.Lock
while modifying these shared objects to prevent data corruption.
Here is a stripped-down example which should get you started:
import time
from multiprocessing import Pool, Manager
def individual_sim(mlist, mvalue, mlock, idx):
# in your real computation, make sure to not hold the lock longer than
# really needed (e.g. calculations without holding lock)
with mlock:
mlist[idx] += 10
mvalue.value += sum(mlist)
def simulate(n_workers, n):
with Manager() as m:
mlist = m.list([i for i in range(n)])
print(mlist)
mvalue = m.Value('i', 0)
mlock = m.Lock()
iterable = [(mlist, mvalue, mlock, i) for i in range(n)]
with Pool(processes=n_workers) as pool:
pool.starmap(individual_sim, iterable)
# convert to non-shared objects before terminating manager
mlist = list(mlist)
mvalue = mvalue.value
return mlist, mvalue
if __name__=="__main__":
N_WORKERS = 4
N = 20
start = time.perf_counter()
L, B = simulate(N_WORKERS, N)
print(L)
print(B)
print("Time elapsed: ",time.perf_counter() - start)
Example Output:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed: 0.14064819699706277
Process finished with exit code 0
It would also be possible to use Pool's initializer
-parameter to pass proxies upon worker initialization and register them as globals instead of sending them as regular arguments with the starmap-call.
A bit more about Manager
usage (relevant: nested proxies) I've written up here.