Granular locking of shared memory buffer

I need to create a shared memory that one process pushes to and another process samples from. To minimize the amount of time the shared memory is locked, I attempted to create a lock for each index in the shared memory buffer.

When I run this, I get the error: OSError: [Errno 23] Too many open files in system. (Stack trace below is abbreviated)

How can I achieve granular control over the shared memory buffer of length >100,000 so that some segments are being read while others are being written? Is there some other construct I can use?

In [1]: import multiprocessing as mp 
   ...: from multiprocessing.shared_memory import SharedMemory 
   ...: class Memory: 
   ...:     def __init__(self, length): 
   ...:         self.shm = SharedMemory(create=True, size=length) 
   ...:         self.locks = [mp.Lock() for _ in range(length)] 
   ...:     def __getitem__(self, item): 
   ...:         return int.from_bytes(self.shm.buf[item], 'big') 
   ...:     def __setitem__(self, key, value): 
   ...:         assert isinstance(value, int) 
   ...:         self.shm.buf[key] = value.to_bytes(1, 'big') 

In [2]: m = Memory(100_000)                                                     
OSError                                   Traceback (most recent call last)
<ipython-input-2-2c6433483d72> in <module>
----> 1 m = Memory(100_000)


OSError: [Errno 23] Too many open files in system

I'm running this on Kubernetes with the docker image

ulimit returns unlimited. Regardless, setting ulimit is apparently an open issue on Kubernetes.

Some more details:

  • In the real implementation, I set up strides so that each key indexes 28240 bytes, which make up six different objects, which are a mixture of ints and numpy arrays.
  • The input is fed by a queue that I am trying to keep empty. Things are working just fine on this side even if I set a lock on the entire buffer.

Push worker:

def _push_worker(self) -> None:
    buffer_len = 100
    while True:
        sample = self.replay_in_queue.get()
        if len(self.buffer_in) >= self.initial_memory // buffer_len:
            index = self.sample_count % self.memory_maxlen
            self.memory[index: index + buffer_len] = self.buffer_in
            self.sample_count += buffer_len
            self.buffer_in = []
  • If I set a lock on the entire buffer, the output queue oscillates between full and empty, even if I make it much larger than it should be. I am trying to keep the output queue full.

Sample worker:

def _sample_worker(self) -> None:
    while True:
        batch = random.choices(self.memory, k=self.batch_size)
  • I have tried locking small blocks of the buffer, which works OK, but I'm wondering if there is a perfect solution to my problem.


  • My solution was to lock memory blocks. This needed fine-tuning of the number of blocks to get optimal performance.

    class Memory:
        A shared memory utility class
        # n_bytes:
        #   2 4x84x84 uint8 arrays
        #   4 32-bit (4-byte) numbers
        #   1 bool (1-byte)
        int_size = 4
        array_dtype = 'uint8'
        array_bytes = 4 * 84 * 84
        array_shape = (4, 84, 84)
        stride = 2 * array_bytes + 4 * int_size + 1
        _offset = 0
        def __init__(self, length: int):
            self._length = length
            self._shared_memory = SharedMemory(create=True, size=self.stride * length)
            _n_locks = 1_000
            self._locks = [mp.Lock() for _ in range(_n_locks)]
            self._lock_length = length // _n_locks
            assert self._lock_length == length / _n_locks, "length must be divisible by _n_locks"
        def __del__(self):
        def _buf(self):
            return self._shared_memory.buf
        def __len__(self):
            return self._length
        def __getitem__(self, index: Union[slice, int]):
            if isinstance(index, int):
                return self._get_item(index)
            elif isinstance(index, slice):
                return self._get_slice(index)
                raise IndexError
        def _get_slice(self, slice_: slice):
            start = slice_.start if slice_.start is not None else 0
            step = slice_.step if slice_.step is not None else 1
            stop = slice_.stop if slice_.stop is not None else self._length
            if slice_.stop > self._length:
                raise IndexError
            return [self._get_item(i % self._length) for i in range(start, stop, step)]
        # todo: use __get_slice__ and __set_slice__
        def _get_item(self, index):
            if index < 0 or index > self._length:
                raise IndexError(f"index {index} out of bounds")
            with self._locks[index // self._lock_length]:
                self._offset = index * self.stride
                actor_id = int.from_bytes(self._get(self.int_size), 'big')
                step_number = int.from_bytes(self._get(self.int_size), 'big')
                state = np.frombuffer(self._get(self.array_bytes), dtype='uint8').reshape(self.array_shape)
                action = int.from_bytes(self._get(self.int_size), 'big')
                next_state = np.frombuffer(self._get(self.array_bytes), dtype='uint8').reshape(self.array_shape)
                reward = int.from_bytes(self._get(self.int_size), 'big', signed=True)
                done = int.from_bytes(self._get(1), 'big')
                if done:
                    next_state = None
                return Transition(actor_id, step_number, state, action, next_state, reward, done)
        def _get(self, n_bytes: int) -> bytes:
            Get item at `_offset` and move forward `n_bytes`
            :param n_bytes: Number of bytes to retrieve from memory
            :return: bytes copied from memory
            item = self._buf[self._offset: self._offset + n_bytes]
            self._offset += n_bytes
            return item.tobytes()
        def __setitem__(self, index: Union[int, slice], transition: Union[List[Transition], Transition]):
            Store `transition` in shared memory
            :param index: Index of the memory location to store
            :param transition: a `Transition`
            if isinstance(index, int):
                assert isinstance(transition, Transition)
                self._set_item(index, transition)
            elif isinstance(index, slice):
                assert isinstance(transition, List)
                self._set_slice(index, transition)
                raise IndexError
        def _set_slice(self, slice_: slice, transitions: List[Transition]):
            start = slice_.start if slice_.start is not None else 0
            step = slice_.step if slice_.step is not None else 1
            stop = slice_.stop if slice_.stop is not None else self._length
            for i, t in zip(range(start, stop, step), transitions):
                self._set_item(i % self._length, t)
        def _set_item(self, index, transition):
            if index < 0 or index > self._length:
                raise IndexError(f"index {index} out of bounds")
            with self._locks[index // self._lock_length]:
                self._offset = index * self.stride
                # 'actor_id', 'step_number', 'state', 'action', 'next_state', 'reward', 'done'
                self._set(transition.actor_id.to_bytes(self.int_size, 'big'))
                self._set(transition.step_number.to_bytes(self.int_size, 'big'))
                self._set(transition.action.to_bytes(self.int_size, 'big'))
                if transition.next_state is not None:
                    self._offset += self.array_bytes
                self._set(int(transition.reward).to_bytes(self.int_size, 'big', signed=True))
                self._set(transition.done.to_bytes(1, 'big'))
        def _set(self, bytearray_: Union[bytearray, bytes]):
            update `_buf` and move `_offset`
            :param bytearray_: a bytearray
            len_ = len(bytearray_)
            self._buf[self._offset: self._offset + len_] = bytearray_
            self._offset = self._offset + len_
        def __iter__(self):
            for i in range(self._length):
                yield self[i]