Search code examples
python-3.xtensorflowmachine-learningkerasdeep-learning

How to pass multi-input data loader to single-input model


I have the following code that works fine. But the problem is that it works only on one batch because I am using next(iter)

I created a tensorflow dataset that must return 3 values (i.e. X: [x,y,z]) for my problem. But I need to pass only the x value to the model. And I need to keep all 3 values packed together as I will be using y and z later. Now, the thing is that when I am going to call fit, I have to somehow separate those 3 values in order to call properly the network architecture. So, my problem is how to call fit using the PrefetchDataset in such scenarios.

import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Input, Dense,  Activation, \
    Conv2DTranspose, Conv2D, Reshape
from tensorflow.keras.models import Model

AUTOTUNE = tf.data.experimental.AUTOTUNE

def scale(X, a=-1, b=1, dtype='float32'):
    if a > b:
        a, b = b, a
    xmin = tf.cast(tf.math.reduce_min(X), dtype=dtype)
    xmax = tf.cast(tf.math.reduce_max(X), dtype=dtype)
    X = (X - xmin) / (xmax - xmin)
    scaled = X * (b - a) + a
    return scaled, xmin, xmax

def set_shape_b(x, y, z):
    x = tf.reshape(x,  [16, 16, 2])
    y = tf.reshape(y, [1])
    z = tf.reshape(z, [1])
    return x, y, z

def set_shape_a(x, y, z):
    x = tf.reshape(x,  [4, 4, 2])
    y = tf.reshape(y, [1])
    z = tf.reshape(z, [1])
    return x, y, z

def First(lr):
    inp = Input(lr)
    x = Dense(16)(inp)
    x = Reshape((4, 4, 16))(x)
    x = Conv2DTranspose(2, kernel_size=3, strides=2, padding='same')(x)
    x = Conv2DTranspose(2, kernel_size=3, strides=2, padding='same')(x)
    output = Activation('tanh')(x)
    model = Model(inp, output, name='First')
    return model
    
def Second(hr):
    inp = Input(hr)
    x = Dense(16)(inp)
    x = Conv2D(2, kernel_size=3, strides=2, padding='same')(x)
    x = Conv2D(2, kernel_size=3, strides=2, padding='same')(x)
    output = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=inp, outputs=output, name='Second')
    return model
    

def build_model(First, Second):
    inp = Input(shape=INP)
    gen = First(inp)
    output = Second(gen)
    model = Model(inputs=inp , outputs=[gen, output], name='model')
    return model

# Preproces --------------- #
a = np.random.random((20, 4, 4, 2)).astype('float32')
b = np.random.random((20, 16, 16, 2)).astype('float32')

dataset_a = tf.data.Dataset.from_tensor_slices(a)
dataset_b = tf.data.Dataset.from_tensor_slices(b)

dataset_b = dataset_b.map(lambda x: tf.py_function(scale,
                                                   [x], 
                                                   (tf.float32, tf.float32, tf.float32)))
dataset_b = dataset_b.map(set_shape_b)

dataset_a = dataset_a.map(lambda x: tf.py_function(scale,
                                                   [x], 
                                                   (tf.float32, tf.float32, tf.float32)))
dataset_a = dataset_a.map(set_shape_a)
 
dataset_ones = tf.data.Dataset.from_tensor_slices(tf.ones((len(b), 4, 4, 1)))   

dataset = tf.data.Dataset.zip((dataset_a, (dataset_b, dataset_ones)))

dataset = dataset.cache()
dataset = dataset.batch(2)
dataset = dataset.prefetch(buffer_size=AUTOTUNE)

# Prepare models -------------------- #
INP = (4, 4, 2)
OUT = (16, 16, 2)

first = First(INP)
second = Second(OUT)
model = build_model(first, second)

model.compile(loss=['mse', 'binary_crossentropy'],
              optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4))


train_l, (train_h, train_ones) = next(iter(dataset))


# train ------------------
model.fit(train_l[0],
          [train_h[0], train_ones],
          epochs=2)
              

UPDATE

def rescale(X_scaled, xmin, xmax):
    X = (xmax - xmin) * (X_scaled + 1) / 2.0 + xmin
    return X

class PlotCallback(tf.keras.callbacks.Callback):
    def __init__(self, image, xmin, xmax, model):
        self.image = image
        self.xmin = xmin
        self.xmax = xmax
        self.model = model
        
    def on_epoch_end(self, epoch, logs={}):
        preds = self.model.predict(self.image)
        y_pred = preds[0]
        y_pred = rescale(y_pred, self.xmin, self.xmax)

        
        fig, ax = plt.subplots(figsize=(14, 10))
        ax.imshow(y_pred[0][:, :, 0])
        plt.close()

I am using the above functions and when trying to fit I want sth like that:

model.fit(
    dataset,
    validation_data=dataset,
    epochs=2,
    callbacks=[PlotCallback(here_the_dataset_a_scaled_values,
                            xmin_from_dataset_a,
                            xmax_from_dataset_b, model)]
)

Solution

  • Following the comment above, to resolve your issue you can apply custom function to return only the target values. Also, take a look at tf.data.Dataset.map for references.

    def set_shape(x, y, z, dims):
        x = tf.reshape(x,  dims)
        y = tf.reshape(y, [1])
        z = tf.reshape(z, [1])
        return x, y, z
    
    dataset_a = dataset_a.map(lambda x, y, z: set_shape(x, y, z, dims=[4, 4, 2]))
    dataset_b = dataset_b.map(lambda x, y, z: set_shape(x, y, z, dims=[16, 16, 2]))
    
    def only_scale(x, y, z):
        return x
    
    dataset_a = dataset_a.map(only_scale)
    dataset_b = dataset_b.map(only_scale)
    

    Zipping and batch the data.

    dataset = tf.data.Dataset.zip(
        (dataset_a, (dataset_b, dataset_ones))
    )
    dataset = dataset.cache()
    dataset = dataset.batch(2)
    dataset = dataset.prefetch(buffer_size=AUTOTUNE)
    
    a, b = next(iter(dataset)) 
    a.shape, b[0].shape, b[1].shape
    (TensorShape([2, 4, 4, 2]),
     TensorShape([2, 16, 16, 2]),
     TensorShape([2, 4, 4, 1]))
    

    Now, we can pass it to fit method.

    # train ------------------
    model.fit(
        dataset,
        epochs=2
    )
    
    Epoch 1/2
    2s 6ms/step - loss: 1.0283 - First_loss: 0.3368 - Second_loss: 0.6914
    Epoch 2/2
    0s 4ms/step - loss: 1.0228 - First_loss: 0.3367 - Second_loss: 0.6860
    

    Update 1

    As it is mentioned in the comment, .map(only_scale) can not be used in order to receive (scale, xmin, xmax) for un-scaling in the middle of training. But we can't pass such data format to model that doesn't expect such input specificaiton. In other word, the model code has no idea about xmin and xmax.

    In such case, there are two option to resolve it. One is to use custom training loop in keras and another one is to override the train_step function of fit method. Let's try the second one. In that case, we don't need to use .map(only_scale) method from data APIs. Here is the references about override the fit method.

    Let's build a custom model to override the trian_step and (also test_step for validation data). FYI, there is also predict_step.

    class CustomFitter(keras.Model):
        def __init__(self, model, **kwargs):
            super().__init__(**kwargs)
            self.model = model 
        
        def call(self, inputs):
            return self.model(inputs)
        
        def unpack(self, data):
            x, y = data
            # x: dataset_a
            # y: (dataset_b, dataset_ones)
            # dataset_a / datast_b: (scale, xmin, xmax)
            scale_y = y[0][0]
            ones_y = y[1]
            y = (scale_y, ones_y)
            x = x[0]
            return x, y
        
        def train_step(self, data):
            x, y = self.unpack(data)
            return super().train_step((x, y))
        
        def test_step(self, data):
            x, y = self.unpack(data)
            return super().test_step((x, y))
    

    Next, we can do

    model = build_model(first, second)
    model = CustomFitter(model)
    model.compile(
        loss=['mse', 'binary_crossentropy'],
        optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4)
    )
    

    Next, we can now fit the datast (without using only_scale method).

    model.fit(
        dataset,
        validation_data=dataset,
        epochs=2
    )
    
    Epoch 1/2
    45ms/step - loss: 1.0278 - output_1_loss: 0.3358 - output_2_loss: 0.6919 - val_loss: 1.0262 - val_output_1_loss: 0.3357 - val_output_2_loss: 0.6905
    Epoch 2/2
    8ms/step - loss: 1.0249 - output_1_loss: 0.3356 - output_2_loss: 0.6893 - val_loss: 1.0234 - val_output_1_loss: 0.3355 - val_output_2_loss: 0.6879
    

    Update 2

    Regarding the use of xmin and xmax in callback to rescale prediction array and plotting, we can do somethings as follows.

    1. We will store the valus of xmin and xmax at the training time. We will store these values from validation data set for now.
    2. Later in callback we use this values at on_epoch_end and reset at on_epoch_begin for the next epoch.

    At first we will do:

    from tensorflow.experimental import numpy as tnp
    
    with tf.device('/CPU:0'):
        scaling_xmin = tf.Variable(
            tnp.empty((0, 1), dtype=tf.float32), shape=[None, 1], trainable=False
        )
        scaling_xmax = tf.Variable(
            tnp.empty((0, 1), dtype=tf.float32), shape=[None, 1], trainable=False
        )
    
    class CustomFitter(keras.Model):
        ....
        
        def unpack(self, data, data_src='valid'):
            x, y = data
            # x: dataset_a
            # y: (dataset_b, dataset_ones)
            # dataset_a / datast_b: (scale, xmin, xmax)
            
            if data_src == 'valid':
                scaling_xmin.assign(
                    tf.concat([scaling_xmin, x[1]], axis=0)
                )
                scaling_xmax.assign(
                    tf.concat([scaling_xmax, x[2]], axis=0)
                )
            
            scale_y = y[0][0]
            ones_y = y[1]
            y = (scale_y, ones_y)
            x = x[0]
            return x, y
    
        def train_step(self, data):
            x, y = self.unpack(data, data_src='train')
            return super().train_step((x,y))
        
        def test_step(self, data):
            x, y = self.unpack(data, data_src='valid')
            return super().test_step((x, y))
    

    Now, in callback, we will do

    class PlotCallback(tf.keras.callbacks.Callback):
        def __init__(self, image):
            self.image = image
            
        def on_epoch_begin(self, epoch, logs=None):
            scaling_xmin.assign(
                tf.Variable(
                    tnp.empty((0,1), dtype=tf.float32), shape=[None,1]
                )
            )
            scaling_xmax.assign(
                tf.Variable(
                    tnp.empty((0,1), dtype=tf.float32), shape=[None,1]
                )
            )
    
        def on_epoch_end(self, epoch, logs={}):
            preds = self.model.predict(self.image)
            y_pred = preds[0]
            
            # assuming y_pred.shape[0] == xmin.shape[0] == xmax.shape[0]
            for yp, xmin, xmax in zip(
                y_pred, scaling_xmin.numpy(), scaling_xmax.numpy()
            ):
                yp = rescale(
                    yp, xmin, xmax
                )
                fig, ax = plt.subplots(figsize=(14, 10))
                ax.imshow(yp[:, :, 0])
                break
            plt.show()
    

    Next, we can call this callback. Note, we are passing 2D single input. If the PlotCallback(dataset) is passed, make sure to implement predict_step, that would be almost same as test_step in above model code.

    a = np.random.random((20, 4, 4, 2)).astype('float32')
    custom_model.fit(
        dataset,
        validation_data=dataset,
        callbacks=[PlotCallback(a)],
        epochs=2
    )
    
    107ms/step - loss: 1.0251 - output_1_loss: 0.3387 - output_2_loss: 0.6864 - val_loss: 1.0239 - val_output_1_loss: 0.3386 - val_output_2_loss: 0.6853
    
    [plot will be displayed]
    

    Update 3

    As you mentioned in the comment, at first the log names in the display were First_loss, Second_loss and after update 1/2, it becomes the output_1_loss and output_2_loss. To fix this, we can change the model code a bit. First, we would do

    def build_model(First, Second):
        inp = Input(shape=INP)
        gen = First(inp)
        output = Second(gen)
        return inp, [gen, output]
    

    Next, we do as follows to the CustomFitter, remove the init and call method, not needed anymore.

    class CustomFitter(keras.Model):
        def unpack(self, data, data_src='valid'):
            ...
            return x, y
        
        def train_step(self, data):
            x, y = self.unpack(data, data_src='train')
            return super().train_step((x,y))
        
        def test_step(self, data):
            x, y = self.unpack(data, data_src='valid')
            return super().test_step((x, y))
    
    
    first = First(INP)
    second = Second(OUT)
    inputs, outputs = build_model(first, second)
    custom_model = CustomFitter(inputs, outputs)
    custom_model.compile(
        loss=[
            'mse', 
            'binary_crossentropy'
        ],
        optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4)
    )
    
    loss: 1.0337 - First_loss: 0.3405 - Second_loss: 0.6932 - val_loss: 1.0330 - val_First_loss: 0.3404 - val_Second_loss: 0.6926