I am attempting to zip together multiple tf.data.Dataset objects for a multi-input keras model. Each tf.data.Dataset object is a concatenation of multiple dataframes, each with the same number of columns, but not necessarily the same number of lines. I am able to create the full dataset, but then when I try to pass the dataset in a keras model I get an error :
TypeError: Inputs to a layer should be tensors. Got: <tensorflow.python.data.ops.dataset_ops._NestedVariant object
The problem is that I would really like to take advantage of the lazy structure of the tf.data.Dataset since I am using the window function, but I am having difficulty aggregating all the datasets together.
How do I zip together multiple datasets in a way that the can be passed into the model.fit() function?
Any help would be appreciated.
Here is a simple functional code that recreates my problem :
import pandas as pd
import numpy as np
import tensorflow as tf
# Create dataframes with 4 features and target
dataframe1 = pd.DataFrame(np.random.randn(1000, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
dataframe2 = pd.DataFrame(np.random.randn(800, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
# Convert dataframes to datasets
def get_dataset(df: pd.DataFrame, features):
dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].iloc[4:].to_numpy())
return dataset
def get_dataset_windowed(df: pd.DataFrame, features):
dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].to_numpy()).window(5, shift=1, stride=1,
drop_remainder=True)
return dataset
windowed_dataset = [get_dataset_windowed(x, ["feature3", "feature4"]) for x in [dataframe1, dataframe2]]
windowed_dataset = tf.data.Dataset.from_tensor_slices(windowed_dataset)
windowed_dataset = windowed_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)
static_dataset = [get_dataset(x, ["feature1", "feature2"]) for x in [dataframe1, dataframe2]]
static_dataset = tf.data.Dataset.from_tensor_slices(static_dataset)
static_dataset = static_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)
targets = [get_dataset(x, ["target"]) for x in [dataframe1, dataframe2]]
targets = tf.data.Dataset.from_tensor_slices(targets)
targets = targets.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)
# Zip datasets together
full_dataset = tf.data.Dataset.zip(
(
{
"short_term_ts_input": windowed_dataset,
"static_input": static_dataset,
},
{
"output": targets,
}
)
)
full_dataset = full_dataset.shuffle(buffer_size=1024).batch(128)
# Creating, compiling and fitting model
short_term_ts_input = tf.keras.Input(shape=(5, 2), name="short_term_ts_input")
static_input = tf.keras.Input(shape=(2), name="static_input")
targets = tf.keras.Input(shape=(1,), name="output")
short_term_ts_features = tf.keras.layers.LSTM(32, return_sequences=False)(short_term_ts_input)
short_term_ts_features = tf.keras.layers.Dense(8)(short_term_ts_features)
static_features = tf.keras.layers.Dense(16)(static_input)
x_concat = tf.keras.layers.concatenate([short_term_ts_features, static_features])
x_concat = tf.keras.layers.Dense(32)(x_concat)
output = tf.keras.layers.Dense(1)(x_concat)
model = tf.keras.Model(inputs=[short_term_ts_input, static_input], outputs=[output])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))
tf.keras.utils.plot_model(model, "model_test.png", show_shapes=True)
model.fit(full_dataset)`
Maybe something like this:
import pandas as pd
import numpy as np
import tensorflow as tf
# Create dataframes with 4 features and target
dataframe1 = pd.DataFrame(np.random.randn(1000, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
dataframe2 = pd.DataFrame(np.random.randn(800, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
# Convert dataframes to datasets
def get_dataset(df: pd.DataFrame, features):
dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].iloc[4:].to_numpy())
return dataset
def get_dataset_windowed(df: pd.DataFrame, features):
dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].to_numpy()).window(5, shift=1, stride=1,
drop_remainder=True)
return dataset
windowed_dataset = [get_dataset_windowed(x, ["feature3", "feature4"]) for x in [dataframe1, dataframe2]]
windowed_dataset = tf.data.Dataset.from_tensor_slices(windowed_dataset)
windowed_dataset = windowed_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE).flat_map(lambda z: z.batch(5))
static_dataset = [get_dataset(x, ["feature1", "feature2"]) for x in [dataframe1, dataframe2]]
static_dataset = tf.data.Dataset.from_tensor_slices(static_dataset)
static_dataset = static_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)
targets = [get_dataset(x, ["target"]) for x in [dataframe1, dataframe2]]
targets = tf.data.Dataset.from_tensor_slices(targets)
targets = targets.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)
# Zip datasets together
full_dataset = tf.data.Dataset.zip(
(
{
"short_term_ts_input": windowed_dataset,
"static_input": static_dataset,
},
)
)
full_dataset = tf.data.Dataset.zip((full_dataset, targets))
full_dataset = full_dataset.shuffle(buffer_size=1024).batch(128)
# Creating, compiling and fitting model
short_term_ts_input = tf.keras.Input(shape=(5, 2), name="short_term_ts_input")
static_input = tf.keras.Input(shape=(2), name="static_input")
short_term_ts_features = tf.keras.layers.LSTM(32, return_sequences=False)(short_term_ts_input)
short_term_ts_features = tf.keras.layers.Dense(8)(short_term_ts_features)
static_features = tf.keras.layers.Dense(16)(static_input)
x_concat = tf.keras.layers.concatenate([short_term_ts_features, static_features])
x_concat = tf.keras.layers.Dense(32)(x_concat)
output = tf.keras.layers.Dense(1)(x_concat)
model = tf.keras.Model(inputs=[short_term_ts_input, static_input], outputs=[output])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
tf.keras.utils.plot_model(model, "model_test.png", show_shapes=True)
model.fit(full_dataset)