Search code examples
tensorflowkerastensorflow-datasets

Tensorflow: How to zip together multiple concatenated datasets?


I am attempting to zip together multiple tf.data.Dataset objects for a multi-input keras model. Each tf.data.Dataset object is a concatenation of multiple dataframes, each with the same number of columns, but not necessarily the same number of lines. I am able to create the full dataset, but then when I try to pass the dataset in a keras model I get an error :

TypeError: Inputs to a layer should be tensors. Got: <tensorflow.python.data.ops.dataset_ops._NestedVariant object

The problem is that I would really like to take advantage of the lazy structure of the tf.data.Dataset since I am using the window function, but I am having difficulty aggregating all the datasets together.

How do I zip together multiple datasets in a way that the can be passed into the model.fit() function?

Any help would be appreciated.

Here is a simple functional code that recreates my problem :


import pandas as pd
import numpy as np


import tensorflow as tf

# Create dataframes with 4 features and  target

dataframe1 = pd.DataFrame(np.random.randn(1000, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
dataframe2 = pd.DataFrame(np.random.randn(800, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])


# Convert dataframes to datasets

def get_dataset(df: pd.DataFrame, features):
    dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].iloc[4:].to_numpy())

    return dataset


def get_dataset_windowed(df: pd.DataFrame, features):
    dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].to_numpy()).window(5, shift=1, stride=1,
                                                                                        drop_remainder=True)
    return dataset


windowed_dataset = [get_dataset_windowed(x, ["feature3", "feature4"]) for x in [dataframe1, dataframe2]]

windowed_dataset = tf.data.Dataset.from_tensor_slices(windowed_dataset)
windowed_dataset = windowed_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)


static_dataset = [get_dataset(x, ["feature1", "feature2"]) for x in [dataframe1, dataframe2]]

static_dataset = tf.data.Dataset.from_tensor_slices(static_dataset)
static_dataset = static_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)


targets = [get_dataset(x, ["target"]) for x in [dataframe1, dataframe2]]

targets = tf.data.Dataset.from_tensor_slices(targets)
targets = targets.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)

# Zip datasets together

full_dataset = tf.data.Dataset.zip(
    (
        {
            "short_term_ts_input": windowed_dataset,
            "static_input": static_dataset,
        },
        {
             "output": targets,
        }
    )
)
full_dataset = full_dataset.shuffle(buffer_size=1024).batch(128)


# Creating, compiling and fitting model

short_term_ts_input = tf.keras.Input(shape=(5, 2), name="short_term_ts_input")
static_input = tf.keras.Input(shape=(2), name="static_input")
targets = tf.keras.Input(shape=(1,), name="output")

short_term_ts_features = tf.keras.layers.LSTM(32, return_sequences=False)(short_term_ts_input)
short_term_ts_features = tf.keras.layers.Dense(8)(short_term_ts_features)
static_features = tf.keras.layers.Dense(16)(static_input)
x_concat = tf.keras.layers.concatenate([short_term_ts_features, static_features])
x_concat = tf.keras.layers.Dense(32)(x_concat)

output = tf.keras.layers.Dense(1)(x_concat)

model = tf.keras.Model(inputs=[short_term_ts_input, static_input], outputs=[output])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))

tf.keras.utils.plot_model(model, "model_test.png", show_shapes=True)

model.fit(full_dataset)`


Solution

  • Maybe something like this:

    import pandas as pd
    import numpy as np
    
    
    import tensorflow as tf
    
    # Create dataframes with 4 features and  target
    
    dataframe1 = pd.DataFrame(np.random.randn(1000, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
    dataframe2 = pd.DataFrame(np.random.randn(800, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
    
    
    # Convert dataframes to datasets
    
    def get_dataset(df: pd.DataFrame, features):
        dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].iloc[4:].to_numpy())
    
        return dataset
    
    
    def get_dataset_windowed(df: pd.DataFrame, features):
        dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].to_numpy()).window(5, shift=1, stride=1,
                                                                                            drop_remainder=True)
        return dataset
    
    
    windowed_dataset = [get_dataset_windowed(x, ["feature3", "feature4"]) for x in [dataframe1, dataframe2]]
    
    windowed_dataset = tf.data.Dataset.from_tensor_slices(windowed_dataset)
    windowed_dataset = windowed_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE).flat_map(lambda z: z.batch(5))
    
    
    static_dataset = [get_dataset(x, ["feature1", "feature2"]) for x in [dataframe1, dataframe2]]
    
    static_dataset = tf.data.Dataset.from_tensor_slices(static_dataset)
    static_dataset = static_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)
    
    
    targets = [get_dataset(x, ["target"]) for x in [dataframe1, dataframe2]]
    
    targets = tf.data.Dataset.from_tensor_slices(targets)
    targets = targets.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)
    
    # Zip datasets together
    
    full_dataset = tf.data.Dataset.zip(
        (
           {
                "short_term_ts_input": windowed_dataset,
                "static_input": static_dataset,
            },
          
        )
    )
    full_dataset = tf.data.Dataset.zip((full_dataset, targets))
    full_dataset = full_dataset.shuffle(buffer_size=1024).batch(128)
    
    
    # Creating, compiling and fitting model
    
    short_term_ts_input = tf.keras.Input(shape=(5, 2), name="short_term_ts_input")
    static_input = tf.keras.Input(shape=(2), name="static_input")
    
    short_term_ts_features = tf.keras.layers.LSTM(32, return_sequences=False)(short_term_ts_input)
    short_term_ts_features = tf.keras.layers.Dense(8)(short_term_ts_features)
    static_features = tf.keras.layers.Dense(16)(static_input)
    x_concat = tf.keras.layers.concatenate([short_term_ts_features, static_features])
    x_concat = tf.keras.layers.Dense(32)(x_concat)
    
    output = tf.keras.layers.Dense(1)(x_concat)
    
    model = tf.keras.Model(inputs=[short_term_ts_input, static_input], outputs=[output])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
    
    tf.keras.utils.plot_model(model, "model_test.png", show_shapes=True)
    
    model.fit(full_dataset)