Search code examples
tensorflowtf.kerastensorflow2.x

Input pipeline w/ keras.utils.Sequence object or tf.data.Dataset?


I am currently using a tf.keras.utils.Sequence object to generate image batches for a CNN. I am using Tensorflow 2.2 and the Model.fit method for the model. When I fit the model, the following warning is thrown in each epoch when I set use_multiprocessing=True in tf.keras.model.fit(...):

WARNING:tensorflow:multiprocessing can interact badly with TensorFlow,
 causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended

The model is optimizing just fine, as expected from the docs and the fact that I am using a Sequence-based generator. But if use_multiprocessing is going to be a deprecated functionality in lieu of tf.data objects, I would like to be using the most up-to-date input pipeline. I currently use the following tf.keras.utils.Sequence-based generator inspired by this article on good practices for partitioning large datasets: https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly

class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, data_dir, batch_size=32, dim=(128,128), n_channels=1,
             n_classes=2, shuffle=True, **augmentation_kwargs):
    'Initialization'
    self.dim = dim
    self.batch_size = batch_size
    self.labels = labels
    self.list_IDs = list_IDs
    self.data_dir = data_dir
    self.n_channels = n_channels
    self.n_classes = n_classes
    self.shuffle = shuffle
    self.on_epoch_end()
    self.augmentor = keras.preprocessing.image.ImageDataGenerator(**augmentation_kwargs)

def __len__(self):
    'Denotes the number of batches per epoch'
    return int(np.floor(len(self.list_IDs) / self.batch_size))

def __getitem__(self, index):
    'Generate one batch of data'
    # Generate indexes of the batch
    indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

    # Find list of IDs
    list_IDs_temp = [self.list_IDs[k] for k in indexes]

    # Generate data
    X, y = self.__data_generation(list_IDs_temp)

    return X, y

def on_epoch_end(self):
    'Updates indexes after each epoch'
    self.indexes = np.arange(len(self.list_IDs))
    if self.shuffle == True:
        np.random.shuffle(self.indexes)

def __data_generation(self, list_IDs_temp):
    'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
    # Initialization
    X = np.empty((self.batch_size, *self.dim))
    y = np.empty((self.batch_size), dtype=int)

    # Generate data
    for i, ID in enumerate(list_IDs_temp):

        # Store sample
        X[i,] = np.load(self.data_dir +'/{}_stars.npy'.format(ID))

        # Store class
        y[i] = self.labels[ID]

    # Reshape and apply augmentation to sample
    X,y = self.augmentor.flow(X.reshape(self.batch_size,*self.dim,1),y=y,
                              shuffle=False,batch_size=self.batch_size)[0]

    return X, y

All data from all classes is in the data_dir directory and are stored as individual .npy files. The IDs come from a list of strings. The class labels are taken from a dictionary whose keys are the IDs -- as in the article.

I really like the intuition of the Sequence generator set-up. I can also easily generator random batches to check that it is behaving as I would expect. But how can I reproduce this set-up with tf.data? How do I reproduce the multiprocessing batch generation of a Sequence generator with the interleave and prefetch methods of tf.data.Dataset? And/or can I simply ingest this Sequence-based generator with the tf.data.Dataset.from_generator() method?

Many thanks in advance.


Solution

  • may be to late to answer, but that what I did and it's work fine for me; 1- my class was like that;

    class DataGen(Sequence):
        def __init__(self, df, sr=8000, seconds=3, batch_size=16, shuffle=True):
            self.files = np.array(df.filepath)
            self.label = np.array(df.label)
            self.batch_size = batch_size
            self.shuffle = shuffle
            self.sr = sr
            self.seconds = seconds
            self.dim = self.sr*self.seconds
            self.on_epoch_end()
        
        def __len__():
            return len(self.label)//self.batch_size
        
        def __getitem__(self, x):
            indexs = self.indexs[np.arange(x, x+self.batch_size)]
            return self.__getBatch__(indexs)
            
        def __getBatch__(self, indexs):
            X, y = [], []
            for i in indexs:
                wav = self.__loadFile__(self.files[i])
                X.append(librosa.feature.mfcc(wav, self.sr).T)
                y.append(self.label[i])
            return tf.convert_to_tensor(X), to_categorical(y, num_classes=2)
            
        def __loadFile__(self, file):
            y, sr = librosa.load(file, sr=8000, mono=True)
            if len(y)>self.dim:
                return y[:self.dim]
            return np.pad(y, (0, self.dim-len(y)), 'constant', constant_values=0)
            
        def on_epoch_end(self):
            self.indexs = np.arange(len(self.label))
            if self.shuffle:
                np.random.shuffle(self.indexs)
    

    2- than I change to a function like follow;

    def gen(sr=8000, seconds=3, batch_size=16, shuffle=True):
        dim = sr*seconds
        def loadFile(file):
            wav, _ = librosa.load(file, sr=sr, mono=True)
            if len(wav)>dim:
                return wav[:dim]
            return np.pad(wav, (0, dim-len(wav)), 'constant', constant_values=0)
        
        while True:
            indexs = np.arange(len(df))
            if shuffle:
                np.random.shuffle(indexs)
            
            for x in range(len(df)//batch_size):
                X, y = [], []
                for i in indexs[np.arange(x*batch_size, (x+1)*batch_size)]:
                    X.append(librosa.feature.mfcc(loadFile(df.filepath[i]), sr).T)
                    y.append(df.label[i])
                    
                yield tf.convert_to_tensor(X), to_categorical(y, num_classes=2)
    

    3- and works fine:

    dataset = tf.data.Dataset.from_generator(gen, (tf.dtypes.float32, tf.dtypes.int32))