Search code examples
pythonparallel-processingmultiprocessing

How to have immutable shared object in python multiprocessing map


How can I let all of my worker processes share the same object that none of them mutate? For example, what is the cleanest way of writing a function that computes the dot product of an argument vector with a second vector that is the same for all processes. Naively, I would write something like this:

import multiprocessing
import numpy as np

def main():
    static_vector = np.array([1,2,3,4,5])

    def f(v):
        return np.dot(v, static_vector)

    with multiprocessing.Pool() as p:
        results = p.map(f, [np.random.random((5,1)) for _ in range(10)])

    print(results)
    
if __name__ == "__main__":
    main()

But this fails with the error AttributeError: Can't pickle local object 'main.<locals>.f'. For the sake of argument, computing the static vector takes some time and should not be done in each subprocess.


Solution

  • Seems like you are looking for multiprocessing.shared_memory.

    Try this out!

    import multiprocessing
    from multiprocessing import shared_memory
    import numpy as np
    
    def init_pool(shm_name,shape, dtype):
        global static_vector
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        static_vector = np.ndarray(shape,dtype=dtype, buffer=existing_shm.buf)
    
    def f(v):
        return np.dot(v,static_vector)
    
    def main():
        static_vector = np.array([1,2,3,4,5], dtype=np.float64)
        static_vector_shape= static_vector.shape
        static_vector_dtype = static_vector.dtype
    
        shm = shared_memory.SharedMemory(create=True, size=static_vector.nbytes)
        shm_array = np.ndarray(static_vector_shape, dtype=static_vector_dtype, buffer=shm.buf)
        np.copyto(shm_array, static_vector)
    
        with multiprocessing.Pool(initializer=init_pool, initargs=(shm.name, static_vector_shape,static_vector_dtype)) as pool:
            vectors = [np.random.random(static_vector_shape) for _ in range(10)]
            results =pool.map(f, vectors)
    
        print(results)
        shm.close()
        shm.unlink()
    
    if __name__ == "__main__":
        main()
    

    Here I created shared memory block and copy static_vector into it.

    Also I used initializer and initargs params of Pool to pass shared memory name and array metadata to each proces. In init_pool function each worker attaches to shared memory and reconstructs static_vector as a Numppy array