I have a code that requires some parallelization for which I used the Python's multiprocessing
module, and in particular, the Pool
class. The relevant part of the code where parallelization takes place looks something like this
import multiprocessing as mp
import numpy as np
@jit( nopython=True )
def numba_product( a, b ):
a_len = len(a)
b_len = len(b)
n = len( a[0,:] )
c_res = np.empty( ( a_len*b_len, n ), dtype=np.complex128 )
c_count = 0
for i in range(a_len):
for j in range( b_len ):
c_res[ c_count , : ] = np.multiply( a[i,:], b[ j, : ] )
c_count += 1
return c_res
def do_some_computations( shared_object, index ):
d = shared_object.get_dictionary_1()
some_numpy_array_1 = shared_object.get_numpy_array_1( index ) #this gets a numpy array from
# shared object attribute, i.e.,
# from shared_object class
# definition, the method returns
# "self.the_array" attribute that
# belongs to shared object, see
# dummy version of class definition
# below
mask_array_1 = shared_object.get_mask_array_1() # this gets a mask for the specified array
filtered_array_1 = some_numpy_array_1[ mask_array_1] #note that this defines a local new array,
# but shouldn't modify some_numpy_array_1
# ( I believe )
s_keys = shared_object.get_keys_for_index( index ) #gets the keys corresponding to
#that index to create a new array
v = np.array( [ d1[ x ] for x in s_keys ] )
final_result = numba_product( filtered_array_1, v ) #
def pool_worker_function( index, args ):
shared_object = args[0]
result = do_some_computations( shared_object, index )
return result
def parallel_exec( shared_object, N ):
number_processors = mp.cpu_count()
number_used_processors = number_processors - 1
#try pool map method with a READ-ONLY object that is "shared_object".
# This object contains two big dictionaries from which values are retrieved,
# and various NumPy arrays of considerable dimension
from itertools import repeat
pool = mp.Pool( processes = number_used_processors )
a = list( np.linspace( 0, N, N ) )
args = ( shared_object, )
number_tasks = number_used_processors
n_chunck = int( ( len(a) - 1 )/number_tasks )
result = pool.starmap( pool_worker_function, zip( a, repeat( args ) ), chunksize = n_chunck)
pool.close()
pool.join()
return result
THE PROBLEM:
The problem I am having is that when I run it under Unix OS, on a 32-core system, I only observe few cores being working on the parallelization... As far as I understand, Unix provides automatic os.fork()
as copy-on-write, which means that IF my shared_object is not modified during the calls, the parallelization should take place without extra memory consumption, and all cores should execute their tasks separately? Here is a snapshot of what I see when the program reaches the parallelization part:
These is puzzling me and I have made sure that the total number of cores provided by cpu.count() is 32. Another thing that I observe is that throughout the parallelization, the amount of free memory decreases continuously from ~84 GiB available to ~59 GiB. This hints probably that copies of the "shared_object" class are being created with each process, therefore making a copy of all the dictionaries and NumPy arrays that the class contains. I would like to circumvent this issue; I would like to use all cores for the parallelization, but honestly I have no idea of what is going on here.
The code is expected to run in the Unix machine of 32 cores, but my own laptop has Windows OS, and here is a snapshot of what I see on Windows when I launch it ( although for what I have read, Windows does not support os.fork()
method, so no surprise on the high memory consumption I guess ? ).
As you can see, calls to the OS( in red ) occupy a very high % of the CPU usage. This seems to be the case also in the snapshot shown above for the Linux case.
Finally, I want to stress that the class "shared_object" has the following form:
class shared_object():
def __init__(): pass
def store_dictionaries_and_arrays( dict_1, dict_2, array_1, array_2, ... ):
self.dict_1 = dict_1
self.dict_2 = dict_2
self.array_1 = array_1
# same for all other arguments passed
def get_dictionary_1():
return self.dict_1
def get_numpy_array_1():
return self.array_1
but for many more attributes, therefore many more "get" methods. This is a very big data container, and therefore I would like to expect no copies of it when executing the parallelization, since attributes should only be accessed but not modified, what am I missing here? Any help is greatly appreciated, this has been hitting me for long time now... Many thanks!
based on your comments, I think you just want to do something like this:
def pool_worker_function(index, args):
return do_some_computations(_shared_hack, index)
def parallel_exec(shared_object, N):
global _shared_hack
_shared_hack = shared_object
# it'll use ncores processes by default
with mp.Pool() as pool:
return pool.map(pool_worker_function, range(N))
i.e. save the shared_object
somewhere global and let the child processes pick it up as they need it.
you're doing lots of weirdness setting things up which I've stripped out including setting up a list of chuncks
which weren't used anywhere. I've also switched to using range
as you were also using list(np.linspace(0, N, N))
to set up some indexes which seems broken. e.g., N=4
would give you [0, 1.333, 2.667, 4]
which doesn't look like something I'd want to index an array with