Search code examples
pythonmultiprocessingprocess-pool

ProcessPoolExecutor on shared dataset and multiple arguments


I am facing an issue I was not able to solve by doing some search on the web.

I am using the minimal code below. The goal is to run some function 'f_sum' several million times by multiprocessing (using the ProcessPoolExecutor). I am adding multiple arguments by a list of tuples 'args'. In addition, the function is supposed to use some sort of data which is the same for all executions (in the example it's just one number). I do not want to add the data to the 'args' tuple for memory reasons.

The only option I found so far is adding the data outside of the "if name == 'main'". This will (for some reason that I do not understand) make the variable available to all processes. However, updating is not possible. Also, I do not really want to make the data definition outside because in the actual code it will be based on data import and might require additional manipulation.

Hope you can help and thanks in advance!

PS: I am using Python 3.7.9 on Win 10.

from concurrent.futures import ProcessPoolExecutor
import numpy as np

data = 0 # supposed to be a large data set & shared among all calculations)
num_workers = 6  # number of CPU cores
num_iterations = 10  # supposed to be large number


def f_sum(args):
    (x,y) = args
    print('This is process', x, 'with exponent:', y)
    value = 0
    for i in range(10**y):
        value += i
    return value/10**y + data


def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(workers) as executor:
        results = executor.map(func, args)
    return list(results)


if __name__ == '__main__':
    data = 0.5  # try to update data, should not be part of 'args' due to memory

    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,8)))

    result = multiprocessing(f_sum, args, num_workers)

    if np.abs(result[0]-np.round(result[0])) > 0:
        print('data NOT updated')

Edit to original question:

>> Performance Example 1

from concurrent.futures import ProcessPoolExecutor
import numpy as np
import time

data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
data = np.random.randint(0,100,size=data_size)
# data = np.linspace(0,data_size,data_size+1, dtype=np.uintc)

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'random number:', y, 'last data', data[-1])
    value = 0
    for i in range(num_sum):
        value += i
    result = value - num_sum*(num_sum-1)/2 + data[-1]
    return result

def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(workers) as executor:
        results = executor.map(func, args)
    return list(results)

if __name__ == '__main__':
    t0 = time.time()

    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,10)))

    result = multiprocessing(f_sum, args, num_workers)

    print(f'expected result: {data[-1]}, actual result: {np.unique(result)}')
    t1 = time.time()
    print(f'total time: {t1-t0}')

>> Output

This is process 99 random number: 6 last data 9
expected result: 86, actual result: [ 3.  9. 29. 58.]
total time: 11.760863542556763

Leads to false result if randint is used. For linspace result is correct.

>> Performance Example 2 - based on proposal in answer

from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time

data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
input = np.random.randint(0, 100, size=data_size)
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'random number:', y, 'last data', data[-1])
    value = 0
    for i in range(num_sum):
        value += i
    result = value - num_sum*(num_sum-1)/2 + data[-1]
    return result

def init_pool(the_data):
    global data
    data = the_data

def multiprocessing(func, args, workers, input):
    data = Array('i', input, lock=False)
    with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
        results = list(executor.map(func, args))
    return results

if __name__ == '__main__':
    t0 = time.time()
    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,10)))

    result = multiprocessing(f_sum, args, num_workers, input)

    print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
    t1 = time.time()
    print(f'total time: {t1-t0}')

>> Output

This is process 99 random number: 7 last data 29
expected result: 29, actual result: [29.]
total time: 30.8266122341156

@Booboo

I added two examples to my original question, the "Performance Example 2" is based on your code. First interesting finding, my original code actually gives incorrect results if the data array is initialized with random integers. I noticed, that each process by itself initializes the data array. Since it is based on random numbers each process uses a different array for calculation, and even different than the main. So that use case would not work with this code, in your code it is correct all the time.

If using linspace, however, it works, since this gives the same result each time. Same would be true for the use case where some data is read from a file (which is my actual use case). Example 1 is still about 3x faster than Example 2, and I think the time is mainly used by the initializing of the array in your method.

Regarding memory usage I don't see a relevant difference in my task manager. Both Example produce a similar increase in memory, even if the shape is different.

I still believe that your method is the correct approach, however, memory usage seems to be similar and speed is slower in the example above.


Solution

  • The most efficient used of memory would be to use shared memory so that all processes are working on the same instance of data. This would be absolutely necessary if the processes updated data. In the example below, since the access to data is read only and I am using a simple array of integers, I am using multiprocessing.Array with no locking specified. The "trick" is to initialize your pool by specifying the initializer and initargs arguments so that each process in the pool has access to this shared memory. I have made a couple of other changes to the code, which I have commented

    from concurrent.futures import ProcessPoolExecutor
    import numpy as np
    from multiprocessing import Array, cpu_count # new imports
    
    
    def init_pool(the_data):
        global data
        data = the_data
    
    def f_sum(args):
        (x,y) = args
        print('This is process', x, 'with exponent:', y)
        value = 0
        for i in range(10**y):
            value += i
        return value/10**y + len(data) # just use the length of data for now
    
    def multiprocessing(func, args, workers):
        data = Array('i', range(1000), lock=False) # read-only, integers 0, 1, 2, ... 999
        with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
            results = list(executor.map(func, args)) # create the list of results here
        print(results) # so that it can be printed out for demo purposes
        return results
    
    
    if __name__ == '__main__':
        num_iterations = 10  # supposed to be large number
        #num_workers = 6  # number of CPU cores
        num_workers = cpu_count()  # number of CPU cores
    
        args = []
        for k in range(num_iterations):
            args.append((k, np.random.randint(1,8)))
    
        result = multiprocessing(f_sum, args, num_workers)
    
        if np.abs(result[0]-np.round(result[0])) > 0:
            print('data NOT updated')
    

    Prints:

    This is process 0 with exponent: 2
    This is process 1 with exponent: 1
    This is process 2 with exponent: 4
    This is process 3 with exponent: 3
    This is process 4 with exponent: 5
    This is process 5 with exponent: 1
    This is process 6 with exponent: 5
    This is process 7 with exponent: 2
    This is process 8 with exponent: 6
    This is process 9 with exponent: 6
    [1049.5, 1004.5, 5999.5, 1499.5, 50999.5, 1004.5, 50999.5, 1049.5, 500999.5, 500999.5]
    data NOT updated
    

    Updated Example 2

    You saw my comments to your question concerning Example 1.

    Your Example 2 is still not ideal: You have the statement input = np.random.randint(0, 100, size=data_size) as a global being needlessly executed by every process as it is initialized for use in the process pool. Below is an updated solution that also shows one way how you can have your worker function work directly with a numpy array that is backed up a multiprocessing.Array instance so that the numpy array exists in shared memory. You don't have to use this technique for what you are doing since you are only using numpy to create random numbers (I an not sure why), but it is a useful technique to know. But you should re-rerun your code after moving the initialization code of input as I have so it is only executed once.

    I don't have the occasion to work with numpy day to day but I have come to learn that it uses multiprocessing internally for many of its own functions. So it is often not the best match for use with multiprocessing, although that does not seem to be applicable here since even in the case below we are just indexing an element of an array and it would not be using a sub-process to accomplish that.

    from concurrent.futures import ProcessPoolExecutor
    import numpy as np
    from multiprocessing import Array
    import time
    import ctypes
    
    data_size = 10**8
    num_workers = 4
    num_sum = 10**7
    num_iterations = 100
    # input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)
    
    
    def to_shared_array(arr, ctype):
        shared_array = Array(ctype, arr.size, lock=False)
        temp = np.frombuffer(shared_array, dtype=arr.dtype)
        temp[:] = arr.flatten(order='C')
        return shared_array
    
    def to_numpy_array(shared_array, shape):
        '''Create a numpy array backed by a shared memory Array.'''
        arr = np.ctypeslib.as_array(shared_array)
        return arr.reshape(shape)
    
    def f_sum(args):
        (x,y) = args
        print('This is process', x, 'random number:', y, 'last data', data[-1])
        value = 0
        for i in range(num_sum):
            value += i
        result = value - num_sum*(num_sum-1)/2 + data[-1]
        return result
    
    def init_pool(shared_array, shape):
        global data
        data = to_numpy_array(shared_array, shape)
    
    def multiprocessing(func, args, workers, input):
        input = np.random.randint(0, 100, size=data_size)
        shape = input.shape
        shared_array = to_shared_array(input, ctypes.c_long)
        with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(shared_array, shape)) as executor:
            results = list(executor.map(func, args))
        return input, results
    
    if __name__ == '__main__':
        t0 = time.time()
        args = []
        for k in range(num_iterations):
            args.append((k, np.random.randint(1,10)))
    
        input, result = multiprocessing(f_sum, args, num_workers, input)
    
        print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
        t1 = time.time()
        print(f'total time: {t1-t0}')