Search code examples
pythonarraysnumpypython-multiprocessing

python multiprocessing when share a numpy array


I want to change the value in a large numpy array partially by leveraging multiprocessing.

That is to say, I want to get [[100, 100, 100], [100, 100, 100]] in the end.

However the following code is wrong and it says "RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance"

What should I do? Thanks.

import numpy as np
import multiprocessing

from multiprocessing import RawArray, Array


def change_array(array, i, j):
    X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
    X_np[i, j] = 100
    print(np.frombuffer(array.get_obj()))

if __name__ == '__main__':
    X_shape = (2, 3)
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    X = Array('d', X_shape[0] * X_shape[1])
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data)

    pool = multiprocessing.Pool(processes=3)

    result = []
    for i in range(2):
        for j in range(3):
            result.append(pool.apply_async(change_array, (X, i, j,)))

    result = [r.get() for r in result]
    pool.close()
    pool.join()

    print(np.frombuffer(X.get_obj()).reshape(2, 3))


Solution

  • You need to make two changes:

    1. Use a multiprocessing.Array instance with locking (actually, the default) rather than a "plain" Array.
    2. Do not pass the array instance as an argument to your worker function. Instead you should initialize each processor in your pool with the array as a global value.
    import numpy as np
    import multiprocessing
    
    from multiprocessing import RawArray, Array
    
    
    def initpool(arr):
        global array
        array = arr
    
    def change_array(i, j):
        X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
        X_np[i, j] = 100
        print(np.frombuffer(array.get_obj()))
    
    if __name__ == '__main__':
        X_shape = (2, 3)
        data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
        X = multiprocessing.Array('d', X_shape[0] * X_shape[1], lock=True)
        # Wrap X as an numpy array so we can easily manipulates its data.
        X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
        # Copy data to our shared array.
        np.copyto(X_np, data)
    
        pool = multiprocessing.Pool(processes=3, initializer=initpool, initargs=(X,))
    
        result = []
        for i in range(2):
            for j in range(3):
                result.append(pool.apply_async(change_array, (i, j,)))
    
        result = [r.get() for r in result]
        pool.close()
        pool.join()
    
        print(np.frombuffer(X.get_obj()).reshape(2, 3))
    

    Prints:

    [100.    2.2   3.3   4.4   5.5   6.6]
    [100.  100.    3.3   4.4   5.5   6.6]
    [100.  100.  100.    4.4   5.5   6.6]
    [100.  100.  100.  100.    5.5   6.6]
    [100.  100.  100.  100.  100.    6.6]
    [100. 100. 100. 100. 100. 100.]
    [[100. 100. 100.]
     [100. 100. 100.]]
    

    Update

    Since in this case the values being changed in the data array do not depend on the existing values in that array, there is no need for function change_array to have access to the array and it can instead, as suggested by Frank Yellin, just return a tuple of the indices to be changed with the new value. But I did want to show you how you would pass the array for those situations where the function did need to access/modify the array. The following code, in this instance, however, is all that you need (I have made a few simplifications):

    import numpy as np
    import multiprocessing
    
    
    def change_array(i, j):
        return i, j, 100
    
    if __name__ == '__main__':
        data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
        with multiprocessing.Pool(processes=3) as pool:
            result = [pool.apply_async(change_array, (i, j)) for i in range(2) for j in range(3)]
            for r in result:
                i, j, value = r.get()
                data[i, j] = value
            print(data)
    

    Or:

    import numpy as np
    import multiprocessing
    import itertools
    
    
    def change_array(t):
        i, j = t
        return i, j, 100
    
    if __name__ == '__main__':
        data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
        with multiprocessing.Pool(processes=3) as pool:
            for i, j, value in pool.map(change_array, itertools.product(range(2), range(3))):
                data[i, j] = value
            print(data)