Search code examples
pythonmultiprocessinglockingshared-memory

Is it necessary to close the SharedMemory at all times?


I noticed constantly closing and opening memory under heavy loads affects performance (example 1 get/set_shm).

Can I keep an open SharedMemory in different processes and keep a link to a numpy array for this memory(example 2 get/set_shm)? Or such decision can lead to unstable operation of my code?

exmaple 1

class MyClass(object):

    def __init__(self):
        pass

    def create_shm(self):
        self.lock = Lock()
        
        zero_arr = np.zeros(shape=(400,400), dtype=np.uint8)
        existing_shm = shared_memory.SharedMemory(create=True, size=zero_arr.nbytes)
        shm_arr = np.ndarray(zero_arr.shape, dtype=np.uint8, buffer=existing_shm.buf)
        shm_arr[:] = zero_arr[:]
        existing_shm.close()

        #save memory name
        self.shm_name = existing_shm.name

    def get_shm(self):
        existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
        ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)

        self.lock.acquire()
        ar = ar_shm.copy()
        self.lock.release()
        existing_shm.close()
        return ar

    def set_shm(self, ar):
        existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
        ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)

        self.lock.acquire()
        ar_shm[:] = ar[:]
        self.lock.release()
        existing_shm.close()
        return 1

    def func1(self):
        for i in range(30):
            new_arr = np.random.randint(0, high=255, size=(400,400), dtype=np.uint8)
            self.set_shm(new_arr)
            print(f'{i} | func1 | set_shm | {new_arr[:4,0]}')
            time.sleep(random.random())
        
    def func2(self):
        for i in range(30):
            new_arr = self.get_shm()
            print(f'{i} | func2 | get_shm | {new_arr[:4,0]}')
            new_arr[:] = 2
            self.set_shm(new_arr)
            print(f'{i} | func2 | set_shm | {new_arr[:4,0]}')
            time.sleep(random.random())

if __name__ == '__main__':

    foo = MyClass()
    foo.create_shm()

    p1 = Process(target=foo.func1)
    p2 = Process(target=foo.func2)

    p1.start()
    p2.start()


    p1.join()
    p2.join()
    
    ###need unlink but skip it for example

example 2

class MyClass(object):

    def __init__(self):
        pass

    def create_shm(self):
        self.lock = Lock()
        
        zero_arr = np.zeros(shape=(400,400), dtype=np.uint8)
        existing_shm = shared_memory.SharedMemory(create=True, size=zero_arr.nbytes)
        shm_arr = np.ndarray(zero_arr.shape, dtype=np.uint8, buffer=existing_shm.buf)
        shm_arr[:] = zero_arr[:]
        existing_shm.close()

        #save memory name
        self.shm_name = existing_shm.name
        self.shm_link = None
        self.arr_link = None

    def get_shm(self):
        if self.shm_link is None:
            existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
            ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
            self.shm_link = existing_shm
            self.arr_link = ar_shm
        self.lock.acquire()
        ar = self.arr_link.copy()
        self.lock.release()
        return ar

    def set_shm(self, ar):
        if self.shm_link is None:
            existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
            ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
            self.shm_link = existing_shm
            self.arr_link = ar_shm
        self.lock.acquire()
        self.arr_link[:] = ar[:]
        self.lock.release()
        return 1

    def func1(self):
        for i in range(30):
            new_arr = np.random.randint(0, high=255, size=(400,400), dtype=np.uint8)
            self.set_shm(new_arr)
            print(f'{i} | func1 | set_shm | {new_arr[:4,0]}')
            time.sleep(random.random())
        
    def func2(self):
        for i in range(30):
            new_arr = self.get_shm()
            print(f'{i} | func2 | get_shm | {new_arr[:4,0]}')
            new_arr[:] = 2
            self.set_shm(new_arr)
            print(f'{i} | func2 | set_shm | {new_arr[:4,0]}')
            time.sleep(random.random())

if __name__ == '__main__':

    foo = MyClass()
    foo.create_shm()

    p1 = Process(target=foo.func1)
    p2 = Process(target=foo.func2)

    p1.start()
    p2.start()


    p1.join()
    p2.join()
    
    ###need unlink but skip it for example

I just want to sure that example 2 it's ok


Solution

  • I would actually suggest closing the file all the time is not only hurting performance, but actually incorrect. On Windows if there's no process with an open handle to the SHM file, it will be automatically deleted without calling unlink. This could unintentionally cause a race condition where you are only relying on the timing of each function to overlap for the file to not be deleted too early. If you want your code to be available cross-platform, you should make sure at least one process (probably the main process) keeps a handle open until the data should be unlinked. Beyond that, I see no reason at all to require the file to be closed until the process ends in the child processes. A design pattern I have followed is to make a contextmanager to handle the opening and closing of the SHM. I have even rolled it into a class to provide a shared numpy array: https://stackoverflow.com/a/73279376/3220135

    import numpy as np
    from multiprocessing import shared_memory, Process
    
    class Shared_Arr: #helper class to make shared_memory arrays easier
        def __init__(self, shape, dtype, shm=None):
            self.shape=shape
            self.dtype=dtype
    
            if shm is None:
                n_bytes = int(np.dtype(dtype).itemsize * np.prod(shape))
                self.shm = shared_memory.SharedMemory(create=True, size=n_bytes)
                self.owner = True
            else:
                self.shm = shm
                self.owner = False
    
            self.arr = np.ndarray(self.shape, self.dtype, buffer=self.shm.buf)
    
        def __reduce__(self): #make it picklable so it can be sent to a child process correctly
            return (self.__class__, (self.shape, self.dtype, self.shm))
    
        def __enter__(self): #context manager is mostly for cleanup so __enter__ is uninteresting
            return self
    
        def __exit__(self, exc_type, exc_value, traceback):
            self.shm.close() #closes the memory-mapped file
            if self.owner:
                self.shm.unlink() #tell the OS to delete the file
    
    def populate_arr(shared, value):
        with shared: #without the context manager you could just manually call shared.shm.close() when you're done with it
            shared.arr[:] = value
    
    if __name__ == "__main__":
        with Shared_Arr([10], int) as shared:
            shared.arr[:] = 0 #np.ndarray may operate like np.empty? initialize to zero
            print(shared.arr) #before modification
            p = Process(target=populate_arr, args=(shared, 5))
            p.start()
            p.join()
            print(shared.arr) #after being modified in a separate process
    

    Note: this example does not address the locking you're doing to protect array access, but could easily be added by passing an additional lock as an argument to the target function.