I noticed constantly closing and opening memory under heavy loads affects performance (example 1 get/set_shm).
Can I keep an open SharedMemory in different processes and keep a link to a numpy array for this memory(example 2 get/set_shm)? Or such decision can lead to unstable operation of my code?
exmaple 1
class MyClass(object):
def __init__(self):
pass
def create_shm(self):
self.lock = Lock()
zero_arr = np.zeros(shape=(400,400), dtype=np.uint8)
existing_shm = shared_memory.SharedMemory(create=True, size=zero_arr.nbytes)
shm_arr = np.ndarray(zero_arr.shape, dtype=np.uint8, buffer=existing_shm.buf)
shm_arr[:] = zero_arr[:]
existing_shm.close()
#save memory name
self.shm_name = existing_shm.name
def get_shm(self):
existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
self.lock.acquire()
ar = ar_shm.copy()
self.lock.release()
existing_shm.close()
return ar
def set_shm(self, ar):
existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
self.lock.acquire()
ar_shm[:] = ar[:]
self.lock.release()
existing_shm.close()
return 1
def func1(self):
for i in range(30):
new_arr = np.random.randint(0, high=255, size=(400,400), dtype=np.uint8)
self.set_shm(new_arr)
print(f'{i} | func1 | set_shm | {new_arr[:4,0]}')
time.sleep(random.random())
def func2(self):
for i in range(30):
new_arr = self.get_shm()
print(f'{i} | func2 | get_shm | {new_arr[:4,0]}')
new_arr[:] = 2
self.set_shm(new_arr)
print(f'{i} | func2 | set_shm | {new_arr[:4,0]}')
time.sleep(random.random())
if __name__ == '__main__':
foo = MyClass()
foo.create_shm()
p1 = Process(target=foo.func1)
p2 = Process(target=foo.func2)
p1.start()
p2.start()
p1.join()
p2.join()
###need unlink but skip it for example
example 2
class MyClass(object):
def __init__(self):
pass
def create_shm(self):
self.lock = Lock()
zero_arr = np.zeros(shape=(400,400), dtype=np.uint8)
existing_shm = shared_memory.SharedMemory(create=True, size=zero_arr.nbytes)
shm_arr = np.ndarray(zero_arr.shape, dtype=np.uint8, buffer=existing_shm.buf)
shm_arr[:] = zero_arr[:]
existing_shm.close()
#save memory name
self.shm_name = existing_shm.name
self.shm_link = None
self.arr_link = None
def get_shm(self):
if self.shm_link is None:
existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
self.shm_link = existing_shm
self.arr_link = ar_shm
self.lock.acquire()
ar = self.arr_link.copy()
self.lock.release()
return ar
def set_shm(self, ar):
if self.shm_link is None:
existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
self.shm_link = existing_shm
self.arr_link = ar_shm
self.lock.acquire()
self.arr_link[:] = ar[:]
self.lock.release()
return 1
def func1(self):
for i in range(30):
new_arr = np.random.randint(0, high=255, size=(400,400), dtype=np.uint8)
self.set_shm(new_arr)
print(f'{i} | func1 | set_shm | {new_arr[:4,0]}')
time.sleep(random.random())
def func2(self):
for i in range(30):
new_arr = self.get_shm()
print(f'{i} | func2 | get_shm | {new_arr[:4,0]}')
new_arr[:] = 2
self.set_shm(new_arr)
print(f'{i} | func2 | set_shm | {new_arr[:4,0]}')
time.sleep(random.random())
if __name__ == '__main__':
foo = MyClass()
foo.create_shm()
p1 = Process(target=foo.func1)
p2 = Process(target=foo.func2)
p1.start()
p2.start()
p1.join()
p2.join()
###need unlink but skip it for example
I just want to sure that example 2 it's ok
I would actually suggest closing the file all the time is not only hurting performance, but actually incorrect. On Windows if there's no process with an open handle to the SHM file, it will be automatically deleted without calling unlink
. This could unintentionally cause a race condition where you are only relying on the timing of each function to overlap for the file to not be deleted too early. If you want your code to be available cross-platform, you should make sure at least one process (probably the main process) keeps a handle open until the data should be unlinked. Beyond that, I see no reason at all to require the file to be closed until the process ends in the child processes. A design pattern I have followed is to make a contextmanager to handle the opening and closing of the SHM. I have even rolled it into a class to provide a shared numpy array: https://stackoverflow.com/a/73279376/3220135
import numpy as np
from multiprocessing import shared_memory, Process
class Shared_Arr: #helper class to make shared_memory arrays easier
def __init__(self, shape, dtype, shm=None):
self.shape=shape
self.dtype=dtype
if shm is None:
n_bytes = int(np.dtype(dtype).itemsize * np.prod(shape))
self.shm = shared_memory.SharedMemory(create=True, size=n_bytes)
self.owner = True
else:
self.shm = shm
self.owner = False
self.arr = np.ndarray(self.shape, self.dtype, buffer=self.shm.buf)
def __reduce__(self): #make it picklable so it can be sent to a child process correctly
return (self.__class__, (self.shape, self.dtype, self.shm))
def __enter__(self): #context manager is mostly for cleanup so __enter__ is uninteresting
return self
def __exit__(self, exc_type, exc_value, traceback):
self.shm.close() #closes the memory-mapped file
if self.owner:
self.shm.unlink() #tell the OS to delete the file
def populate_arr(shared, value):
with shared: #without the context manager you could just manually call shared.shm.close() when you're done with it
shared.arr[:] = value
if __name__ == "__main__":
with Shared_Arr([10], int) as shared:
shared.arr[:] = 0 #np.ndarray may operate like np.empty? initialize to zero
print(shared.arr) #before modification
p = Process(target=populate_arr, args=(shared, 5))
p.start()
p.join()
print(shared.arr) #after being modified in a separate process
Note: this example does not address the locking you're doing to protect array access, but could easily be added by passing an additional lock as an argument to the target function.