I need to create a shared memory that one process pushes to and another process samples from. To minimize the amount of time the shared memory is locked, I attempted to create a lock for each index in the shared memory buffer.
When I run this, I get the error: OSError: [Errno 23] Too many open files in system
. (Stack trace below is abbreviated)
How can I achieve granular control over the shared memory buffer of length >100,000 so that some segments are being read while others are being written? Is there some other construct I can use?
In [1]: import multiprocessing as mp
...: from multiprocessing.shared_memory import SharedMemory
...:
...: class Memory:
...: def __init__(self, length):
...: self.shm = SharedMemory(create=True, size=length)
...: self.locks = [mp.Lock() for _ in range(length)]
...:
...: def __getitem__(self, item):
...: return int.from_bytes(self.shm.buf[item], 'big')
...:
...: def __setitem__(self, key, value):
...: assert isinstance(value, int)
...: self.shm.buf[key] = value.to_bytes(1, 'big')
...:
In [2]: m = Memory(100_000)
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-2-2c6433483d72> in <module>
----> 1 m = Memory(100_000)
...
OSError: [Errno 23] Too many open files in system
I'm running this on Kubernetes with the docker image gitlab-registry.nautilus.optiputer.net/ian/torch:latest
.
ulimit
returns unlimited
.
Regardless, setting ulimit
is apparently an open issue on Kubernetes.
Some more details:
28240
bytes, which make up six different objects, which are a mixture of int
s and numpy
arrays.Push worker:
def _push_worker(self) -> None:
buffer_len = 100
while True:
sample = self.replay_in_queue.get()
self.buffer_in.append(sample)
if len(self.buffer_in) >= self.initial_memory // buffer_len:
index = self.sample_count % self.memory_maxlen
self.memory[index: index + buffer_len] = self.buffer_in
self.sample_count += buffer_len
self.buffer_in = []
Sample worker:
def _sample_worker(self) -> None:
while True:
batch = random.choices(self.memory, k=self.batch_size)
self.replay_out_queue.put(batch)
My solution was to lock memory blocks. This needed fine-tuning of the number of blocks to get optimal performance.
class Memory:
"""
A shared memory utility class
"""
# n_bytes:
# 2 4x84x84 uint8 arrays
# 4 32-bit (4-byte) numbers
# 1 bool (1-byte)
int_size = 4
array_dtype = 'uint8'
array_bytes = 4 * 84 * 84
array_shape = (4, 84, 84)
stride = 2 * array_bytes + 4 * int_size + 1
_offset = 0
def __init__(self, length: int):
self._length = length
self._shared_memory = SharedMemory(create=True, size=self.stride * length)
_n_locks = 1_000
self._locks = [mp.Lock() for _ in range(_n_locks)]
self._lock_length = length // _n_locks
assert self._lock_length == length / _n_locks, "length must be divisible by _n_locks"
def __del__(self):
self._shared_memory.unlink()
@property
def _buf(self):
return self._shared_memory.buf
def __len__(self):
return self._length
def __getitem__(self, index: Union[slice, int]):
if isinstance(index, int):
return self._get_item(index)
elif isinstance(index, slice):
return self._get_slice(index)
else:
raise IndexError
def _get_slice(self, slice_: slice):
start = slice_.start if slice_.start is not None else 0
step = slice_.step if slice_.step is not None else 1
stop = slice_.stop if slice_.stop is not None else self._length
if slice_.stop > self._length:
raise IndexError
return [self._get_item(i % self._length) for i in range(start, stop, step)]
# todo: use __get_slice__ and __set_slice__
def _get_item(self, index):
if index < 0 or index > self._length:
raise IndexError(f"index {index} out of bounds")
with self._locks[index // self._lock_length]:
self._offset = index * self.stride
actor_id = int.from_bytes(self._get(self.int_size), 'big')
step_number = int.from_bytes(self._get(self.int_size), 'big')
state = np.frombuffer(self._get(self.array_bytes), dtype='uint8').reshape(self.array_shape)
action = int.from_bytes(self._get(self.int_size), 'big')
next_state = np.frombuffer(self._get(self.array_bytes), dtype='uint8').reshape(self.array_shape)
reward = int.from_bytes(self._get(self.int_size), 'big', signed=True)
done = int.from_bytes(self._get(1), 'big')
if done:
next_state = None
return Transition(actor_id, step_number, state, action, next_state, reward, done)
def _get(self, n_bytes: int) -> bytes:
"""
Get item at `_offset` and move forward `n_bytes`
:param n_bytes: Number of bytes to retrieve from memory
:return: bytes copied from memory
"""
item = self._buf[self._offset: self._offset + n_bytes]
self._offset += n_bytes
return item.tobytes()
def __setitem__(self, index: Union[int, slice], transition: Union[List[Transition], Transition]):
"""
Store `transition` in shared memory
:param index: Index of the memory location to store
:param transition: a `Transition`
"""
if isinstance(index, int):
assert isinstance(transition, Transition)
self._set_item(index, transition)
elif isinstance(index, slice):
assert isinstance(transition, List)
self._set_slice(index, transition)
else:
raise IndexError
def _set_slice(self, slice_: slice, transitions: List[Transition]):
start = slice_.start if slice_.start is not None else 0
step = slice_.step if slice_.step is not None else 1
stop = slice_.stop if slice_.stop is not None else self._length
for i, t in zip(range(start, stop, step), transitions):
self._set_item(i % self._length, t)
def _set_item(self, index, transition):
if index < 0 or index > self._length:
raise IndexError(f"index {index} out of bounds")
with self._locks[index // self._lock_length]:
self._offset = index * self.stride
# 'actor_id', 'step_number', 'state', 'action', 'next_state', 'reward', 'done'
self._set(transition.actor_id.to_bytes(self.int_size, 'big'))
self._set(transition.step_number.to_bytes(self.int_size, 'big'))
self._set(transition.state.tobytes())
self._set(transition.action.to_bytes(self.int_size, 'big'))
if transition.next_state is not None:
self._set(transition.next_state.tobytes())
else:
self._offset += self.array_bytes
self._set(int(transition.reward).to_bytes(self.int_size, 'big', signed=True))
self._set(transition.done.to_bytes(1, 'big'))
def _set(self, bytearray_: Union[bytearray, bytes]):
"""
update `_buf` and move `_offset`
:param bytearray_: a bytearray
"""
len_ = len(bytearray_)
self._buf[self._offset: self._offset + len_] = bytearray_
self._offset = self._offset + len_
def __iter__(self):
for i in range(self._length):
yield self[i]