Search code examples
python-3.xmultiprocessingshared-memory

python shared memory with pool


I am trying to use the shared_memory with pool in python's multiprocessing.

In the Documentation, about shared memory, the argument buf (the memory view) is not clear to me (perhaps because I don't understand the concept of memory view - is it a pointer?).
I want to use this shared memory across different processes. Following, my example base on the documentation:

a = np.array([1, 1, 2, 3, 5, 8])  
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)

# Do I need to create the existing_shm or I can keep using shm?
existing_shm = shared_memory.SharedMemory(name=shm.name)

Now comes my first problem. I define the function that will use the array in the shared memory:

def test_function(Input):
    c = np.ndarray(a.shape, dtype=np.int64, buffer=existing_shm.buf)
    c[1]=100
    print(c)

This is incorrect but I don't know how should it be.

Then the main. Is there a role of having the main function to make this work?

if __name__=='__main__':
    with Pool(os.cpu_count()) as p:
        p.map(test_function, range(12))

It doesn't work. Do I have to define c in every process? Or I can define it in the main and use it across all processes? I assume that c is a python object and therefore can't be shared by processes due to the gil-lock?

Thank you very much!


Solution

  • This works. I don't have a clear understanding of all the facts yet, though.

    1- The shared memory object is declared:
    shm = shared_memory.SharedMemory(create=True, size=10000000*4).

    2- A (numpy array in this case) object is declared with buffer as follows:
    b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf).

    3- The numpy array is populated by copying data into it.
    b[:] = np.random.randint(100, size=10000000, dtype=np.int32).

    Then, all the function to be executed in many cpus needs is the name of the shared memory object and the mentioned step 2 inside the function is mapping the shared memory, which has been populated earlier.

    It's essential that you close the shared object after accessing it and at the end unlink.

    import numpy as np
    from multiprocessing import shared_memory, Pool
    import os
    
    
    def test_function(args): 
        Input, shm_name, size = args
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        d = np.ndarray(size, dtype=np.int32, buffer=existing_shm.buf)
        #print(Input, d[Input-1:Input+2])
        d[Input]=-20
        #print(Input, d[Input-1:Input+2])
        existing_shm.close()
        print(Input, 'parent process:', os.getppid())
        print(Input, 'process id:', os.getpid())
    
    
    if __name__=='__main__':
        
        shm = shared_memory.SharedMemory(create=True, size=10000000*4)
        b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
        b[:] = np.random.randint(100, size=10000000, dtype=np.int32)
    
        inputs =[[    1,shm.name,b.shape],
        [    2,shm.name,b.shape],
        [    3,shm.name,b.shape],
        [    4,shm.name,b.shape],
        [    5,shm.name,b.shape],
        [    6,shm.name,b.shape],
        [    7,shm.name,b.shape],
        [    8,shm.name,b.shape],
        [    9,shm.name,b.shape],
        [    10,shm.name,b.shape],
        [    11,shm.name,b.shape],
        [    12,shm.name,b.shape],
        [13,shm.name,b.shape]]
    
        with Pool(os.cpu_count()) as p:
            p.map(test_function, inputs)
     
        print(b[:20])
        
        # Clean up from within the first Python shell
        shm.close()
        shm.unlink()  # Free and release the shared memory block at the very end