Search code examples
arraysnumpymultiprocessingshared-memory

Passing shared memory variables in python multiprocessing


I have a bunch of files that I want to read in parallel using Python's multiprocessing and collect all the data in a single NumPy array. For this purpose, I want to define a shared memory NumPy array and pass its slices to different processes to read in parallel. A toy illustration of what I am trying to do is given in the following code where I am trying to modify a numpy array using multiprocessing.

Example 1:


import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    # Need to fill this array in parallel
    arr = np.zeros(4)
    p = multiprocessing.Pool(4)
    # Passing slices to arr to modify using multiprocessing
    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

In this code, I want the arr to be filled with 0, 1, 2, 3. This however prints arr to be all zeros. After reading the answers here, I used multiprocessing.Array to define the shared memory variable and modified my code as follows

Example 2:

import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    # Shared memory Array
    shared = multiprocessing.Array('d', 4)
    arr = np.ctypeslib.as_array(shared.get_obj())

    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

This also prints all zeros for arr. However, when I define the array outside main and use pool.map, the code works. For e.g., the following code works

Example 3:

import numpy as np
import multiprocessing

shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())

def do_stuff(i):
    arr[i]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    shared = multiprocessing.Array('d', 4)
    p.map(do_stuff, idx)
    p.close()
    p.join()
    print(arr)
             

This prints [0,1,2,3].

I am very confused by all this. My questions are:

  1. When I define arr = np.zeros(4), which processor owns this variable? When I then send the slice of this array to different processors what is being sent if this variable is not defined on those processors.

  2. Why doesn't example 2 work while example 3 does?

I am working on Linux and Python/3.7/4


Solution

  • When I define arr = np.zeros(4), which processor owns this variable?

    Only the main process should have access to this. If you use "fork" for the start method, everything will be accessible to the child process, but as soon as something tries to modify it, it will be copied to it's own private memory space before being modified (copy on write). This reduces overhead if you have large read-only arrays, but doesn't help you much for writing data back to those arrays.

    what is being sent if this variable is not defined on those processors.

    A new array is created within the child process when the arguments are re-constructed after being sent from the main process via a pipe and pickle. The data is serialized to text and re-constructed, so no information other than the value of the data in the slice remains. it's a totally new object.

    Why doesn't example 2 work while example 3 does?

    example 3 works because at the time of "fork" (the moment you call Pool), arr has already been created, and will be shared. It's also important that you used an Array to create it, so when you attempt to modify the data, the data is shared (the exact mechanics of this are complicated).

    example 2 does not work in a similar way that example 1 does not work: you pass a slice of an array as an argument, which gets converted into a totally new object, so arr inside your do_stuff function is just a copy of arr[i:i+1] from the main process. It is still important to create anything which will be shared between processes before calling Pool (if you're relying on "fork" to share the data), but that's not why this example doesn't work.

    You should know: example 3 only works because you're on linux, and the default start method is fork. This is not the preferred start method due to the possibility of deadlocks with copying lock objects in a locked state. This will not work on Windows at all, and won't work on MacOS by default on 3.8 and above.

    The best solution (most portable) to all this is to pass the Array itself as the argument, and re-construct the numpy array inside the child process. This has the complication that "shared objects" can only be passed as arguments at the creation of the child process. This isn't as big a deal if you use Process, but with Pool, you basically have to pass any shared objects as arguments to an initialization function, and get the re-constructed array as a global variable of the child's scope. In this example for instance you will get an error trying to pass buf as an argument with p.map or p.apply, but not when passing buf as initargs=(buf,) to Pool()

    import numpy as np
    from multiprocessing import Pool, Array
    
    def init_child(buf):
        global arr #use global context (for each process) to pass arr to do_stuff
        arr = np.frombuffer(buf.get_obj(), dtype='d')
    
    def do_stuff(i):
        global arr
        arr[i]=i
    
    if __name__ == '__main__':
        idx = [0,1,2,3]
        
        buf = Array('d', 4)
        arr = np.frombuffer(buf.get_obj(), dtype='d')
        arr[:] = 0
        
        #"with" context is easier than writing "close" and "join" all the time
        with Pool(4, initializer=init_child, initargs=(buf,)) as p:
            for i in idx:
                p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
        print(arr)
    

    with 3.8 and above there's a new module which is better than Array or any of the other sharedctypes classes called: shared_memory. This is a bit more complicated to use, and has some additional OS dependent nastiness, but it's theoretically lower overhead and faster. If you want to go down the rabbit hole I've written a few answers on the topic of shared_memory, and have recently been answering lots of questions on concurrency in general if you want to take a gander at my answers from the last month or two.