Search code examples
pythontensorflowtime-serieslstmtensorflow-datasets

How to create a multivariate timeseries dataset with tf.data?


I am trying to create an input pipeline for my LSTM model. I am using the tf.data.Dataset.from_generator API to do that. Following the guide, my current minimal example looks like this:

class generator:
    def __init__(self, n=5):
        self.n = n

    def __call__(self):
        for i in range(self.n):
            yield (i, 10*i)

dataset = tf.data.Dataset.from_generator(generator(), 
    output_signature=(tf.TensorSpec(shape=(), dtype=tf.uint16), tf.TensorSpec(shape=(), dtype=tf.int32)))

window_size = 3
windows = dataset.window(window_size, shift=1)

def sub_to_batch(sub):
    return sub.batch(window_size, drop_remainder=True)

final_dset = windows.flat_map(sub_to_batch)

print(list(final_dset.as_numpy_iterator()))

Error message

TypeError: tf__sub_to_batch() takes 1 positional argument but 2 were given

This problem only occurs when using more than one feature in the generator (e.g. change the following lines).

yield (i)

dataset = tf.data.Dataset.from_generator(generator(), 
    output_signature=(tf.TensorSpec(shape=(), dtype=tf.uint16)))

In the version with only 1 feature, the output looks like shape=(3, 3, 1)

[ [ [0], [1], [2] ],
  [ [1], [2], [3] ],
  [ [2], [3], [4] ]  ]

I basically try to achieve a zipping of the individual features so that I get shape=(3, 3, 2):

[ [ [0,  0], [1, 10], [2, 20] ],
  [ [1, 10], [2, 20], [3, 30] ],
  [ [2, 20], [3, 30], [4, 40] ]  ]

How can this be done?


Solution

  • You could try something like the following; however, I am not sure how efficient it is:

    import tensorflow as tf
    
    class generator:
        def __init__(self, n=7):
            self.n = n
    
        def __call__(self):
            for i in range(self.n):
                yield (i, 10*i)
    
    dataset = tf.data.Dataset.from_generator(generator(), 
        output_signature=(tf.TensorSpec(shape=(), dtype=tf.int32), tf.TensorSpec(shape=(), dtype=tf.int32)))
    
    window_size = 5
    windows = dataset.window(window_size, shift=1)
    
    def stack(x, y):
      x = tf.expand_dims(x, axis=1)
      y = tf.expand_dims(y, axis=1)
      result = tf.concat((x, y), axis=1)
      ta = tf.TensorArray(tf.int32, size=0, dynamic_size=True)
      for w in tf.range(3):
        ta = ta.write(w, result[w: w + 3])
      return ta.stack()
    
    def sub_to_batch(sub1, sub2):
        sub1 = sub1.batch(5, drop_remainder=True)
        sub2 = sub2.batch(5, drop_remainder=True)
    
        return tf.data.Dataset.zip((sub1, sub2)).map(stack)
    
    final_dset = windows.flat_map(sub_to_batch)
    for s in final_dset.take(1):
      print(s)
    
    tf.Tensor(
    [[[ 0  0]
      [ 1 10]
      [ 2 20]]
    
     [[ 1 10]
      [ 2 20]
      [ 3 30]]
    
     [[ 2 20]
      [ 3 30]
      [ 4 40]]], shape=(3, 3, 2), dtype=int32)
    

    You could also hard-code the indices if you want to and the results will be the same:

    def stack(x, y):
      x = tf.expand_dims(x, axis=1)
      y = tf.expand_dims(y, axis=1)
      result = tf.concat((x, y), axis=1)
      return tf.stack([result[0: 3], result[1: 4], result[2: 5]])