Search code examples
pythontensorflowtensorflow2.0tensorflow-datasetstfrecord

time series in tensorflow with tfrecords


I am currently writing a tensorflow model to predict the next value of a label depending on a temporal sequence of items.

I created a series of tfrecords files, each corresponding to a single trial for which I have labeled all the images with the corresponding label. Now, I would like to create a dataset pipeline that could be used to extract windowed subsets of each trial.

This is my function to parse one example of the tfrecords file:

IMG_SIZE_INPUT = (60, 80, 1)

WINDOW_SIZE = 10
BATCH_SIZE = 32

def parse_single_example(example):
    feature_description = {
        'timestamp': tf.io.FixedLenFeature([], tf.int64),
        'image_raw': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.int64)
    }

    features = tf.io.parse_single_example(example, feature_description)

    b_image = features['image_raw'] # get byte string
    image = tf.io.parse_tensor(b_image, out_type = tf.uint8)
    image = tf.reshape(image, IMG_SIZE_INPUT)
    image = tf.cast(image, tf.float32)

    timestamp = features['timestamp']
    label = features['label']

    return timestamp, image, label

These are my functions for the generation of the pipeline:

def _parse_and_augment_image(example, seed, do_augment):
    timestamp, image, label = parse_single_example(example)

    if do_augment:
        # Pad the image and the mask to apply later a crop
        image = tf.image.resize_with_crop_or_pad(image, IMG_SIZE_INPUT[0] + 24, IMG_SIZE_INPUT[1] + 24)

        # Make a new seed.
        new_seed = tf.random.experimental.stateless_split(seed, num = 1)[0, :]

        # Random crop back to the original size.
        image = tf.image.stateless_random_crop(
            image, size = IMG_SIZE_INPUT, seed = new_seed)

        # Random flip L/R
        image = tf.image.stateless_random_flip_left_right(image, new_seed)

        # Random flip U/D
        image = tf.image.stateless_random_flip_up_down(image, new_seed)

        # Random brightness.
        image = tf.image.stateless_random_brightness(
            image, max_delta = 0.075 * 255, seed = new_seed)

        image = tf.clip_by_value(image, 0, 255)

    return timestamp, image, label

def prepare_for_training(tf_record_single_file_path, batch_size, window_size = WINDOW_SIZE, shift_size = 1):
    dataset = tf.data.TFRecordDataset(tf_record_single_file_path)

    # Windowing
    windowed_dataset = dataset.window(window_size, shift=shift_size, drop_remainder=True)
    dataset = windowed_dataset.flat_map(lambda window: window.batch(window_size))

    # Parsing
    counter = tf.data.experimental.Counter()
    train_dataset = tf.data.Dataset.zip((dataset, (counter, counter)))
    dataset = train_dataset.map(partial(_parse_and_augment_image, do_augment = False), num_parallel_calls=10)

    # Batching
    dataset = dataset.batch(batch_size)

    return dataset


def read_dataset(filename_dir, batch_size):
    all_tf_records = glob.glob(filename_dir + os.sep + "*.tfrecords")

    # Test on one file
    dataset = prepare_for_training(all_tf_records[0], 2)

In this situation, tensorflow is complaining about:

features = tf.io.parse_single_example(example, feature_description) ValueError: Input serialized must be a scalar

I have tried multiple things but none are working. This is the closest I came to having a functional pipeline. Could someone point me to where I made an error?


Solution

  • Problem is with serialized example or scalar tensor, below option should work fine

    serialized_example = example[0] # this should be serialized example
    features = tf.io.parse_single_example(serialized_example, feature_description)