Search code examples
pythondequeh5py

Double-ended queue in h5py


In my project I use python's deque as a limited buffer. The problem is a lack of RAM. That's why I'm looking for a solution and one option is to store the buffer on HDD.

I'm wondering if it is possible to enable limited deque in h5py's datasets.

Any advice is welcome.


Solution

  • According to the docs datasets do not even support appending

    The short response is that h5py is NumPy-like, not database-like. Unlike the HDF5 packet-table interface (and PyTables), there is no concept of appending rows.

    So I myself have implemented the desired behaviour:

    import h5py
    import numpy as np
    
    class H5Buffer():
        def __init__(self, array_shape, maxlen, dtype):
            self.maxlen = maxlen
            self.current_idx = 0
    
            self.file = h5py.File("buffer.hdf5", "w")
            self.buffer =  self.file.create_dataset('buffer', (0,)+array_shape, maxshape=(maxlen,)+array_shape, dtype=dtype)        
    
        def append(self, array):
            '''
            array is numpy array with the shape of array_shape
            '''
            add_size = array.shape[0]
            if self.buffer.shape[0]<self.maxlen:
                self._resize(self.buffer.shape[0], add_size)
    
            add_idx = add_size
            end_idx = self.current_idx + add_idx
    
            if end_idx >= self.maxlen:
                add_idx-= end_idx - self.maxlen
                end_idx = self.maxlen
    
            self.buffer[self.current_idx:end_idx] = array[:add_idx]
    
            self.current_idx = end_idx
            if self.current_idx == self.maxlen:
                self.current_idx = 0
            if add_idx != add_size:
                self.append(array[add_idx:])
    
        def _resize(self, current_size, add_size):
            new_size = current_size + add_size
            if new_size > self.maxlen:
                new_size = self.maxlen
            self.buffer.resize(new_size, axis=0)
    
        def sample(self, start_idx, end_idx):
            return self.buffer[start_idx:end_idx]
    
        def length(self):
            return len(self.actions)
    
        def close(self):
            if self.file:
                self.file.close()
            self.file = None
    
        def __del__(self):
            self.close()