Search code examples
python-3.xthreadpoolpython-multithreadingconcurrent.futuresprocess-pool

Why the performance of concurrent.futures.ProcessPoolExecutor is very low?


I'm trying to leverage concurrent.futures.ProcessPoolExecutor in Python3 to process a large matrix in parallel. The general structure of the code is:

class X(object):

self.matrix

def f(self, i, row_i):
    <cpu-bound process>

def fetch_multiple(self, ids):
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(self.f, i, self.matrix.getrow(i)) for i in ids]
        return [f.result() for f in as_completed(futures)]

self.matrix is a large scipy csr_matrix. f is my concurrrent function that takes a row of self.matrix and apply a CPU-bound process on it. Finally, fetch_multiple is a function that run multiple instance of f in parallel and returns the results.

The problem is that after running the script, all cpu cores are less than 50% busy (See the following screenshot):

enter image description here

Why all cores are not busy?

I think the problem is the large object of self.matrix and passing row vectors between processes. How can I solve this problem?


Solution

  • Yes. The overhead should not be that big - but it is likely the cause of your CPUs appearing iddle (although, they should be busy passing the data around anyway).

    But try the recipe here to pass a "pointer" of the object to the subprocess using shared memory.

    http://briansimulator.org/sharing-numpy-arrays-between-processes/

    Quoting from there:

    from multiprocessing import sharedctypes
    size = S.size
    shape = S.shape
    S.shape = size
    S_ctypes = sharedctypes.RawArray('d', S)
    S = numpy.frombuffer(S_ctypes, dtype=numpy.float64, count=size)
    S.shape = shape
    

    Now we can send S_ctypes and shape to a child process in multiprocessing, and convert it back to a numpy array in the child process as follows:

    from numpy import ctypeslib
    S = ctypeslib.as_array(S_ctypes)
    S.shape = shape
    

    It should be tricky to take care of reference counting, but I suppose numpy.ctypeslib takes care of that - so, just coordinate the passing of the actual row number to sub-processes in a way they don't work on the same data