I have the following code that works fine. But the problem is that it works only on one batch because I am using next(iter)
I created a tensorflow dataset that must return 3 values (i.e. X: [x,y,z]
) for my problem. But I need to pass only the x
value to the model. And I need to keep all 3 values packed together as I will be using y
and z
later. Now, the thing is that when I am going to call fit
, I have to somehow separate those 3 values in order to call properly the network architecture. So, my problem is how to call fit
using the PrefetchDataset
in such scenarios.
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Input, Dense, Activation, \
Conv2DTranspose, Conv2D, Reshape
from tensorflow.keras.models import Model
AUTOTUNE = tf.data.experimental.AUTOTUNE
def scale(X, a=-1, b=1, dtype='float32'):
if a > b:
a, b = b, a
xmin = tf.cast(tf.math.reduce_min(X), dtype=dtype)
xmax = tf.cast(tf.math.reduce_max(X), dtype=dtype)
X = (X - xmin) / (xmax - xmin)
scaled = X * (b - a) + a
return scaled, xmin, xmax
def set_shape_b(x, y, z):
x = tf.reshape(x, [16, 16, 2])
y = tf.reshape(y, [1])
z = tf.reshape(z, [1])
return x, y, z
def set_shape_a(x, y, z):
x = tf.reshape(x, [4, 4, 2])
y = tf.reshape(y, [1])
z = tf.reshape(z, [1])
return x, y, z
def First(lr):
inp = Input(lr)
x = Dense(16)(inp)
x = Reshape((4, 4, 16))(x)
x = Conv2DTranspose(2, kernel_size=3, strides=2, padding='same')(x)
x = Conv2DTranspose(2, kernel_size=3, strides=2, padding='same')(x)
output = Activation('tanh')(x)
model = Model(inp, output, name='First')
return model
def Second(hr):
inp = Input(hr)
x = Dense(16)(inp)
x = Conv2D(2, kernel_size=3, strides=2, padding='same')(x)
x = Conv2D(2, kernel_size=3, strides=2, padding='same')(x)
output = Dense(1, activation='sigmoid')(x)
model = Model(inputs=inp, outputs=output, name='Second')
return model
def build_model(First, Second):
inp = Input(shape=INP)
gen = First(inp)
output = Second(gen)
model = Model(inputs=inp , outputs=[gen, output], name='model')
return model
# Preproces --------------- #
a = np.random.random((20, 4, 4, 2)).astype('float32')
b = np.random.random((20, 16, 16, 2)).astype('float32')
dataset_a = tf.data.Dataset.from_tensor_slices(a)
dataset_b = tf.data.Dataset.from_tensor_slices(b)
dataset_b = dataset_b.map(lambda x: tf.py_function(scale,
[x],
(tf.float32, tf.float32, tf.float32)))
dataset_b = dataset_b.map(set_shape_b)
dataset_a = dataset_a.map(lambda x: tf.py_function(scale,
[x],
(tf.float32, tf.float32, tf.float32)))
dataset_a = dataset_a.map(set_shape_a)
dataset_ones = tf.data.Dataset.from_tensor_slices(tf.ones((len(b), 4, 4, 1)))
dataset = tf.data.Dataset.zip((dataset_a, (dataset_b, dataset_ones)))
dataset = dataset.cache()
dataset = dataset.batch(2)
dataset = dataset.prefetch(buffer_size=AUTOTUNE)
# Prepare models -------------------- #
INP = (4, 4, 2)
OUT = (16, 16, 2)
first = First(INP)
second = Second(OUT)
model = build_model(first, second)
model.compile(loss=['mse', 'binary_crossentropy'],
optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4))
train_l, (train_h, train_ones) = next(iter(dataset))
# train ------------------
model.fit(train_l[0],
[train_h[0], train_ones],
epochs=2)
UPDATE
def rescale(X_scaled, xmin, xmax):
X = (xmax - xmin) * (X_scaled + 1) / 2.0 + xmin
return X
class PlotCallback(tf.keras.callbacks.Callback):
def __init__(self, image, xmin, xmax, model):
self.image = image
self.xmin = xmin
self.xmax = xmax
self.model = model
def on_epoch_end(self, epoch, logs={}):
preds = self.model.predict(self.image)
y_pred = preds[0]
y_pred = rescale(y_pred, self.xmin, self.xmax)
fig, ax = plt.subplots(figsize=(14, 10))
ax.imshow(y_pred[0][:, :, 0])
plt.close()
I am using the above functions and when trying to fit I want sth like that:
model.fit(
dataset,
validation_data=dataset,
epochs=2,
callbacks=[PlotCallback(here_the_dataset_a_scaled_values,
xmin_from_dataset_a,
xmax_from_dataset_b, model)]
)
Following the comment above, to resolve your issue you can apply custom function to return only the target values. Also, take a look at tf.data.Dataset.map for references.
def set_shape(x, y, z, dims):
x = tf.reshape(x, dims)
y = tf.reshape(y, [1])
z = tf.reshape(z, [1])
return x, y, z
dataset_a = dataset_a.map(lambda x, y, z: set_shape(x, y, z, dims=[4, 4, 2]))
dataset_b = dataset_b.map(lambda x, y, z: set_shape(x, y, z, dims=[16, 16, 2]))
def only_scale(x, y, z):
return x
dataset_a = dataset_a.map(only_scale)
dataset_b = dataset_b.map(only_scale)
Zipping and batch the data.
dataset = tf.data.Dataset.zip(
(dataset_a, (dataset_b, dataset_ones))
)
dataset = dataset.cache()
dataset = dataset.batch(2)
dataset = dataset.prefetch(buffer_size=AUTOTUNE)
a, b = next(iter(dataset))
a.shape, b[0].shape, b[1].shape
(TensorShape([2, 4, 4, 2]),
TensorShape([2, 16, 16, 2]),
TensorShape([2, 4, 4, 1]))
Now, we can pass it to fit method.
# train ------------------
model.fit(
dataset,
epochs=2
)
Epoch 1/2
2s 6ms/step - loss: 1.0283 - First_loss: 0.3368 - Second_loss: 0.6914
Epoch 2/2
0s 4ms/step - loss: 1.0228 - First_loss: 0.3367 - Second_loss: 0.6860
As it is mentioned in the comment, .map(only_scale)
can not be used in order to receive (scale, xmin, xmax)
for un-scaling in the middle of training. But we can't pass such data format to model that doesn't expect such input specificaiton. In other word, the model code has no idea about xmin
and xmax
.
In such case, there are two option to resolve it. One is to use custom training loop in keras and another one is to override the train_step
function of fit
method. Let's try the second one. In that case, we don't need to use .map(only_scale)
method from data APIs. Here is the references about override the fit method.
Let's build a custom model to override the trian_step
and (also test_step
for validation data). FYI, there is also predict_step
.
class CustomFitter(keras.Model):
def __init__(self, model, **kwargs):
super().__init__(**kwargs)
self.model = model
def call(self, inputs):
return self.model(inputs)
def unpack(self, data):
x, y = data
# x: dataset_a
# y: (dataset_b, dataset_ones)
# dataset_a / datast_b: (scale, xmin, xmax)
scale_y = y[0][0]
ones_y = y[1]
y = (scale_y, ones_y)
x = x[0]
return x, y
def train_step(self, data):
x, y = self.unpack(data)
return super().train_step((x, y))
def test_step(self, data):
x, y = self.unpack(data)
return super().test_step((x, y))
Next, we can do
model = build_model(first, second)
model = CustomFitter(model)
model.compile(
loss=['mse', 'binary_crossentropy'],
optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4)
)
Next, we can now fit the datast (without using only_scale
method).
model.fit(
dataset,
validation_data=dataset,
epochs=2
)
Epoch 1/2
45ms/step - loss: 1.0278 - output_1_loss: 0.3358 - output_2_loss: 0.6919 - val_loss: 1.0262 - val_output_1_loss: 0.3357 - val_output_2_loss: 0.6905
Epoch 2/2
8ms/step - loss: 1.0249 - output_1_loss: 0.3356 - output_2_loss: 0.6893 - val_loss: 1.0234 - val_output_1_loss: 0.3355 - val_output_2_loss: 0.6879
Regarding the use of xmin
and xmax
in callback to rescale prediction array and plotting, we can do somethings as follows.
xmin
and xmax
at the training time. We will store these values from validation data set for now.on_epoch_end
and reset at on_epoch_begin
for the next epoch.At first we will do:
from tensorflow.experimental import numpy as tnp
with tf.device('/CPU:0'):
scaling_xmin = tf.Variable(
tnp.empty((0, 1), dtype=tf.float32), shape=[None, 1], trainable=False
)
scaling_xmax = tf.Variable(
tnp.empty((0, 1), dtype=tf.float32), shape=[None, 1], trainable=False
)
class CustomFitter(keras.Model):
....
def unpack(self, data, data_src='valid'):
x, y = data
# x: dataset_a
# y: (dataset_b, dataset_ones)
# dataset_a / datast_b: (scale, xmin, xmax)
if data_src == 'valid':
scaling_xmin.assign(
tf.concat([scaling_xmin, x[1]], axis=0)
)
scaling_xmax.assign(
tf.concat([scaling_xmax, x[2]], axis=0)
)
scale_y = y[0][0]
ones_y = y[1]
y = (scale_y, ones_y)
x = x[0]
return x, y
def train_step(self, data):
x, y = self.unpack(data, data_src='train')
return super().train_step((x,y))
def test_step(self, data):
x, y = self.unpack(data, data_src='valid')
return super().test_step((x, y))
Now, in callback, we will do
class PlotCallback(tf.keras.callbacks.Callback):
def __init__(self, image):
self.image = image
def on_epoch_begin(self, epoch, logs=None):
scaling_xmin.assign(
tf.Variable(
tnp.empty((0,1), dtype=tf.float32), shape=[None,1]
)
)
scaling_xmax.assign(
tf.Variable(
tnp.empty((0,1), dtype=tf.float32), shape=[None,1]
)
)
def on_epoch_end(self, epoch, logs={}):
preds = self.model.predict(self.image)
y_pred = preds[0]
# assuming y_pred.shape[0] == xmin.shape[0] == xmax.shape[0]
for yp, xmin, xmax in zip(
y_pred, scaling_xmin.numpy(), scaling_xmax.numpy()
):
yp = rescale(
yp, xmin, xmax
)
fig, ax = plt.subplots(figsize=(14, 10))
ax.imshow(yp[:, :, 0])
break
plt.show()
Next, we can call this callback. Note, we are passing 2D single input. If the PlotCallback(dataset)
is passed, make sure to implement predict_step
, that would be almost same as test_step
in above model code.
a = np.random.random((20, 4, 4, 2)).astype('float32')
custom_model.fit(
dataset,
validation_data=dataset,
callbacks=[PlotCallback(a)],
epochs=2
)
107ms/step - loss: 1.0251 - output_1_loss: 0.3387 - output_2_loss: 0.6864 - val_loss: 1.0239 - val_output_1_loss: 0.3386 - val_output_2_loss: 0.6853
[plot will be displayed]
As you mentioned in the comment, at first the log names in the display were First_loss
, Second_loss
and after update 1/2, it becomes the output_1_loss
and output_2_loss
. To fix this, we can change the model code a bit. First, we would do
def build_model(First, Second):
inp = Input(shape=INP)
gen = First(inp)
output = Second(gen)
return inp, [gen, output]
Next, we do as follows to the CustomFitter
, remove the init and call method, not needed anymore.
class CustomFitter(keras.Model):
def unpack(self, data, data_src='valid'):
...
return x, y
def train_step(self, data):
x, y = self.unpack(data, data_src='train')
return super().train_step((x,y))
def test_step(self, data):
x, y = self.unpack(data, data_src='valid')
return super().test_step((x, y))
first = First(INP)
second = Second(OUT)
inputs, outputs = build_model(first, second)
custom_model = CustomFitter(inputs, outputs)
custom_model.compile(
loss=[
'mse',
'binary_crossentropy'
],
optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4)
)
loss: 1.0337 - First_loss: 0.3405 - Second_loss: 0.6932 - val_loss: 1.0330 - val_First_loss: 0.3404 - val_Second_loss: 0.6926