Search code examples
pythontensorflowtf.kerastfrecord

Tensorflow: Modern way to load large data


I want to train a convolutional neural network (using tf.keras from Tensorflow version 1.13) using numpy arrays as input data. The training data (which I currently store in a single >30GB '.npz' file) does not fit in RAM all at once. What is the best way to save and load large data-sets into a neural network for training? Since I didn't manage to find a good answer to this (surely ubiquitous?) problem, I'm hoping to hear one here. Thank you very much in advance for any help!

Sources

Similar questions seem to have been asked many times (e.g. training-classifier-from-tfrecords-in-tensorflow, tensorflow-synchronize-readings-from-tfrecord, how-to-load-data-parallelly-in-tensorflow) but are several years old and usually contain no conclusive answer.

My current understanding is that using TFRecord files is a good way to approach this problem. The most promising tutorial I found so far explaining how to use TFRecord files with keras is medium.com. Other helpful sources were machinelearninguru.com and medium.com_source2 and sources therin.

The official tensorflow documentation and tutorials (on tf.data.Dataset, Importing Data, tf_records etc.) did not help me. In particular, several of the examples given there didn't work for me even without modifications.

My Attempt at using TFRecord files

I'm assuming TFRecords are a good way to solve my problem but I'm having a hard time using them. Here is an example I made based on the tutorial medium.com. I stripped down the code as much as I could.

# python 3.6, tensorflow 1.13.
# Adapted from https://medium.com/@moritzkrger/speeding-up-keras-with-tfrecord-datasets-5464f9836c36
import tensorflow as tf
import numpy as np
from tensorflow.python import keras as keras


# Helper functions (see also https://www.tensorflow.org/tutorials/load_data/tf_records)
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def writeTFRecords():
    number_of_samples = 100  # create some random data to play with
    images, labels = (np.random.sample((number_of_samples, 256, 256, 1)), np.random.randint(0, 30, number_of_samples))

    writer = tf.python_io.TFRecordWriter("bla.tfrecord")

    for index in range(images.shape[0]):
        image = images[index]
        label = labels[index]

        feature = {'image':  _bytes_feature(tf.compat.as_bytes(image.tostring())),
                   'label':  _int64_feature(int(label))}

        example = tf.train.Example(features=tf.train.Features(feature=feature))
        writer.write(example.SerializeToString())
    writer.close()


def loadTFRecord(data_path):
    with tf.Session() as sess:
        feature = {'train/image': tf.FixedLenFeature([], tf.string),
                   'train/label': tf.FixedLenFeature([], tf.int64)}
        # Create a list of filenames and pass it to a queue
        filename_queue = tf.train.string_input_producer([data_path], num_epochs=1)
        # Define a reader and read the next record
        reader = tf.TFRecordReader()
        _, serialized_example = reader.read(filename_queue)
        # Decode the record read by the reader
        features = tf.parse_single_example(serialized_example, features=feature)
        # Convert the image data from string back to the numbers
        image = tf.decode_raw(features['train/image'], tf.float32)

        # Cast label data into int32
        label = tf.cast(features['train/label'], tf.int32)
        # Reshape image data into the original shape
        image = tf.reshape(image, [256, 256, 1])

        return image, label  # I'm not 100% sure that's how this works...


# ######### generate a TFRecords file in the working directory containing random data. #################################
writeTFRecords()
# ######## Load the TFRecords file and use it to train a simple example neural network. ################################
image, label = loadTFRecord("bla.tfrecord")

model_input = keras.layers.Input(tensor=image)
model_output = keras.layers.Flatten(input_shape=(-1, 256, 256, 1))(model_input)
model_output = keras.layers.Dense(16, activation='relu')(model_output)

train_model = keras.models.Model(inputs=model_input, outputs=model_output)
train_model.compile(optimizer=keras.optimizers.RMSprop(lr=0.0001),  
                    loss='mean_squared_error',
                    target_tensors=[label])

print("\n \n start training \n \n") # Execution gets stuck on fitting
train_model.fit(epochs=1, steps_per_epoch=10)  # no output or error messages.

The code creates a TFRecord file and starts fitting, then just gets stuck with no output or error messages. I don't know what the problem is or how I could try to fix it.


Solution

  • While this is no real answer to the original question (i.e. "what is the optimal way to train on large datasets"), I managed to get tfrecords and datasets to work. Of particular help was this tutorial on YouTube. I include a minimal example with working code for anyone struggling with the same problem.

    # Developed using python 3.6, tensorflow 1.14.0.
    # This code writes data (pairs (label, image) where label is int64 and image is np.ndarray) into .tfrecord files and
    # uses them for training a simple neural network. It is meant as a minimal working example of how to use tfrecords. This
    # solution is likely not optimal. If you know how to improve it, please comment on
    # https://stackoverflow.com/q/57717004/9988487. Refer to links therein for further information.
    import tensorflow as tf
    import numpy as np
    from tensorflow.python import keras as keras
    
    
    # Helper functions (see also https://www.tensorflow.org/tutorials/load_data/tf_records)
    def _int64_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
    
    
    def _bytes_feature(value):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
    
    
    def write_tfrecords_file(out_path: str, images: np.ndarray, labels: np.ndarray) -> None:
        """Write all image-label pairs into a single .tfrecord file.
        :param out_path: File path of the .tfrecord file to generate or overwrite.
        :param images: array with first dimension being the image index. Every images[i].tostring() is
            serialized and written into the file as 'image': wrap_bytes(img_bytes)
        :param labels: 1d array of integers. labels[i] is the label of images[i]. Written as 'label': wrap_int64(label)"""
        assert len(images) == len(labels)
        with tf.io.TFRecordWriter(out_path) as writer:  # could use writer_options parameter to enable compression
            for i in range(len(labels)):
                img_bytes = images[i].tostring()  # Convert the image to raw bytes.
                label = labels[i]
                data = {'image': _bytes_feature(img_bytes), 'label': _int64_feature(label)}
                feature = tf.train.Features(feature=data)  # Wrap the data as TensorFlow Features.
                example = tf.train.Example(features=feature)  # Wrap again as a TensorFlow Example.
                serialized = example.SerializeToString()  # Serialize the data.
                writer.write(serialized)  # Write the serialized data to the TFRecords file.
    
    
    def parse_example(serialized, shape=(256, 256, 1)):
        features = {'image': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64)}
        # Parse the serialized data so we get a dict with our data.
        parsed_example = tf.io.parse_single_example(serialized=serialized, features=features)
        label = parsed_example['label']
        image_raw = parsed_example['image']  # Get the image as raw bytes.
        image = tf.decode_raw(image_raw, tf.float32)  # Decode the raw bytes so it becomes a tensor with type.
        image = tf.reshape(image, shape=shape)
        return image, label  # this function will be called once (to add it to tf graph; then parse images individually)
    
    
    # create some arbitrary data to play with: 1000 images sized 256x256 with one colour channel. Use your custom np-arrays
    IMAGE_WIDTH, NUM_OF_IMAGES, NUM_OF_CLASSES, COLOUR_CHANNELS = 256, 10_000, 10, 1
    # using float32 to save memory. Must match type in parse_example(), tf.decode_raw(image_raw, tf.float32)
    features_train = np.random.sample((NUM_OF_IMAGES, IMAGE_WIDTH, IMAGE_WIDTH, COLOUR_CHANNELS)).astype(np.float32)
    labels_train = np.random.randint(low=0, high=NUM_OF_CLASSES, size=NUM_OF_IMAGES)  # one random label for each image
    features_eval = features_train[:200]  # use the first 200 images as evaluation data for simplicity.
    labels_eval = labels_train[:200]
    write_tfrecords_file("train.tfrecord", features_train, labels_train)  # normal: split the data files of several GB each
    write_tfrecords_file("eval.tfrecord", features_eval, labels_eval)  # this may take a while. Consider a progressbar
    # The files are complete. Now define a model and use datasets to feed the data from the .tfrecord files into the model.
    model = keras.Sequential([keras.layers.Flatten(input_shape=(256, 256, 1)),
                              keras.layers.Dense(128, activation='relu'),
                              keras.layers.Dense(10, activation='softmax')])
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    # Check docs for parameters (compression, buffer size, thread count. Also www.tensorflow.org/guide/performance/datasets
    
    train_dataset = tf.data.TFRecordDataset("train.tfrecord")  # specify a list (or dataset) of file names for large data
    train_dataset = train_dataset.map(parse_example)  # parse tfrecords. Parameter num_parallel_calls may help performance.
    train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)
    
    validation_dataset = tf.data.TFRecordDataset("eval.tfrecord")
    validation_dataset = validation_dataset.map(parse_example).batch(64)
    
    model.fit(train_dataset, epochs=3)
    # evaluate the results
    results = model.evaluate(validation_dataset)
    print('\n\nvalidation loss, validation acc:', results)
    
    

    Note that it's tricky to use some_keras_model.fit(..., validation_data=some_dataset) with dataset objects. It may result in TypeError: 'DatasetV1Adapter' object does not support indexing. This seems to be a bug (see github.com/tensorflow/tensorflow/issues/28995) and is supposedly fixed as of tf-nightly version '1.15.0-dev20190808'; The official tutorial uses this too, although it doesn't work in most versions. An easy but dirty-ish fix is to use verbose=0 (which only suppresses program output) and plot the validation results using tensorboard. Also see Keras model.fit() with tf.dataset API + validation_data.