Search code examples
tensorflowmachine-learningkerasdeep-learningkeras-layer

Input dimensions to Keras VAE Not correct


I am new in machine learning and using following code to create a sample VAE on MNISET dataset

# We are going to use MINISET Dataset to train our GAN.
# All imports are going to be in this place.
import numpy as np
# Progressbar
from keras.callbacks import TensorBoard
from tqdm import tqdm

import matplotlib.pyplot as plt

from keras.datasets import mnist
from keras.layers import Conv2D
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import LeakyReLU
from keras.layers import Dropout
from keras.layers import MaxPooling2D
from keras.layers import BatchNormalization
from keras.layers import Flatten

# All Defs will be defined here

# Importing training data of MINISET
def loadData():
    # y_train contains the labels, numbers are 1, 2 or 5, 7 etc.

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    # Normalizing based on color 255
    x_train = (x_train.astype(np.float32) - 127.5) / 127.5
    # convert shape of x_train from (60000, 28, 28) to (60000, 784)
    # 784 columns per row
    # x_train = x_train.reshape(60000, 784)
    return (x_train, y_train, x_test, y_test)

def getEncoder():
    # This returns a tensor
    inputs = Input(shape=(28, 28, 1))

    # Making a sample AlexNet Model Layer 1
    encoder = Conv2D(392, (4, 4), padding='same', activation='relu')(inputs)
    encoder = MaxPooling2D((4, 4), strides=(1, 1), padding='same')(encoder)
    encoder = BatchNormalization()(encoder)

    # Making a sample AlexNet Model Layer 2
    encoder = Conv2D(196, (2, 2), padding='same', activation='relu')(encoder)
    encoder = MaxPooling2D((4, 4), strides=(1, 1), padding='same')(encoder)
    encoder = BatchNormalization()(encoder)

    # Making a sample AlexNet Model Layer 3
    encoder = Conv2D(98, (2, 2), padding='same', activation='relu')(encoder)
    encoder = MaxPooling2D((2, 2), strides=(1, 1), padding='same')(encoder)
    encoder = BatchNormalization()(encoder)

    #encoder = Flatten()(encoder)
    #encoder = Dense(2)(encoder)
    #encoder = Dropout(0.4)(encoder)

    model = Model(inputs=inputs, outputs=encoder)
    model.compile(optimizer='adam', loss='binary_crossentropy')

    # model.fit(data, labels)  # starts training
    print(model.summary())
    return model;

def getDecoder():
    # This returns a tensor
    inputs = Input(shape=(98,))

    # a layer instance is callable on a tensor, and returns a tensor
    disc = Dense(196)(inputs)
    disc = LeakyReLU(alpha=0.2)(disc)
    disc = Dropout(0.3)(disc)

    disc = Dense(392)(disc)
    disc = LeakyReLU(alpha=0.2)(disc)
    disc = Dropout(0.3)(disc)

    disc = Dense(784, activation='sigmoid')(disc)
    disc = LeakyReLU(alpha=0.2)(disc)

    model = Model(inputs=inputs, outputs=disc)
    model.compile(optimizer='rmsprop', loss='binary_crossentropy')

    # starts training
    # model.fit(data, labels)
    print(model.summary())
    return model;

def createVAE(decoder, encoder):

    inputs = Input(shape=(28, 28, 1))
    gen = encoder(inputs)
    output = decoder(gen)

    vae = Model(inputs=inputs, outputs=output)
    print(vae.summary())
    vae.compile(loss='binary_crossentropy', optimizer='rmsprop')

    return vae

def doTraining(epochs=1, batchSize=128):
    # Loading the data
    (mniTrainX, mniTrainY, mniTestX, mniTestY) = loadData()
    # Creating GAN
    encoder = getEncoder()
    decoder = getDecoder()
    vae = createVAE(decoder, encoder)

    vae.fit(mniTrainX, mniTrainX,
                    epochs=epochs,
                    batch_size=batchSize,
                    shuffle=True,
                    validation_data=(mniTestX, mniTestX),
                    callbacks = [TensorBoard(log_dir='/tmp/autoencoder')])

doTraining(10, 128)

The model summary of VAE is

None
Model: "model_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
input_3 (InputLayer)         (None, 28, 28, 1)         0         
_________________________________________________________________
model_1 (Model)              (None, 28, 28, 98)        393862    
_________________________________________________________________
model_2 (Model)              multiple                  404740    
=================================================================
Total params: 798,602
Trainable params: 797,230
Non-trainable params: 1,372

Now its giving error

ValueError: Error when checking input: expected input_3 to have 4 dimensions, but got array with shape (60000, 28, 28)

What am I missing here, whats the 4 dimension.


Solution

  • Your problem is in loadData. Last line needs to be changed as follows. Your encoder expects an image that has one channel, but your current data don't have that channel.

    return (np.expand_dims(x_train, -1), y_train, np.expand_dims(x_test, -1), y_test)
    

    However, you'll run into more issues.

    For example, your encoder outputs the following.

    batch_normalization_3 (Batch (None, 28, 28, 98)        392       
    

    But your decoder takes the following input

    input_2 (InputLayer)         (None, 98)  
    

    Then you're trying to send the output of the encoder ( which is None, 28, 28, 98) to a (None, 98) input layer. Which will not work.

    Implementing the convolutional VAE

    Warning: If you are just starting with TF/Keras, Convolutional VAE is a lot to take in. I suggest going with simpler models like, image classification / autoencoder , etc models rather than jumping to a complex model like a convolutional VAE. But if you're comfortable with this that's okay.

    from tensorflow.keras.layers import Dense, Input, Conv2D, Conv2DTranspose, MaxPooling2D, LeakyReLU, Reshape, BatchNormalization, Flatten, Lambda
    from tensorflow.keras.models import Model
    import tensorflow.keras.backend as K
    from tensorflow.keras.datasets import mnist
    import numpy as np
    
    K.clear_session()
    
    def loadData():
        # y_train contains the labels, numbers are 1, 2 or 5, 7 etc.
    
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        # Normalizing based on color 255
        x_train = x_train.astype(np.float32) /255.0
        x_test = x_test.astype(np.float32)/255.0
        # convert shape of x_train from (60000, 28, 28) to (60000, 784)
        # 784 columns per row
        # x_train = x_train.reshape(60000, 784)
        return (np.expand_dims(x_train, axis=-1), y_train, np.expand_dims(x_test, axis=-1), y_test)
    
    # use reparameterization trick to push the sampling out as input
    # note that "output_shape" isn't necessary with the TensorFlow backend
    
    
    
    def sampling(args):
        """Reparameterization trick by sampling from an isotropic unit Gaussian.
    
        # Arguments
            args (tensor): mean and log of variance of Q(z|X)
    
        # Returns
            z (tensor): sampled latent vector
        """
    
        z_mean, z_log_var = args
        batch = K.shape(z_mean)[0]
        dim = K.int_shape(z_mean)[1]
        # by default, random_normal has mean = 0 and std = 1.0
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + K.exp(0.5 * z_log_var) * epsilon
    
    
    def getEncoder():
        # This returns a tensor
        inputs = Input(shape=(28, 28, 1))
    
        # Making a sample AlexNet Model Layer 1
        encoder = Conv2D(32, (4, 4), padding='same', activation='relu')(inputs)
        encoder = MaxPooling2D((4, 4), strides=(2, 2), padding='same')(encoder) # 14,14
        encoder = BatchNormalization()(encoder)
    
        # Making a sample AlexNet Model Layer 2
        encoder = Conv2D(64, (2, 2), padding='same', activation='relu')(encoder)
        encoder = MaxPooling2D((4, 4), strides=(2, 2), padding='same')(encoder) # 7, 4
        encoder = BatchNormalization()(encoder)
    
        latent_dim = 64
        encoder = Flatten()(encoder)
        z_mean = Dense(latent_dim, name='z_mean')(encoder)
        z_log_var = Dense(latent_dim, name='z_log_var')(encoder)
    
        # This is to perform the reparametrization trick
        z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])
    
        encoder = Model(inputs=inputs, outputs=[z_mean, z_log_var, z])
        #encoder.summary()
        return encoder
    
    def getDecoder():
        # This returns a tensor of shape (None, 28, 28, 1) exact same shape as input
        latent_dim = 64
        inputs = Input(shape=(latent_dim,))
        disc = Dense(7*7*1)(inputs)
        disc = LeakyReLU(alpha=0.2)(disc)
        disc = Reshape([7,7,1])(disc)
        # a layer instance is callable on a tensor, and returns a tensor
        disc = Conv2DTranspose(32, (3,3), strides=(2,2), padding='same', activation='relu')(disc)
        disc = Conv2DTranspose(1, (3,3), strides=(2,2), padding='same', activation='sigmoid')(disc)
    
        decoder = Model(inputs=inputs, outputs=disc)
        #decoder.summary()
        return decoder
    
    
    def kl_loss(z_mean, z_log_var):
    
      kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
      kl_loss = K.sum(kl_loss, axis=-1)
      kl_loss *= -0.5
      return K.mean(kl_loss)
    
    def createVAE(decoder, encoder):
    
        # We are saying that the decoder takes the last output of the encoder as the input
        dec_out = decoder(encoder.outputs[2]) 
        # Defining an end-to-end model with encoder inputs and decoder outputs
        vae = Model(inputs=encoder.inputs, outputs=dec_out)
        print(vae.summary())
    
        # VAE loss comprises both crossentropy and KL divergence loss
        vae.compile(loss='binary_crossentropy', optimizer='rmsprop')
        vae.add_loss(kl_loss(encoder.outputs[0], encoder.outputs[1]))
        return vae
    
    
    def doTraining(epochs=1, batchSize=128):
        # Loading the data
        (mniTrainX, mniTrainY, mniTestX, mniTestY) = loadData()
        # Creating GAN
        encoder = getEncoder()
        decoder = getDecoder()
        vae = createVAE(decoder, encoder)
    
        # I have removed the tensorboard callback. If needed add that.
        vae.fit(mniTrainX, mniTrainX,
                        epochs=epochs,
                        batch_size=batchSize,
                        shuffle=True,
                        validation_data=(mniTestX, mniTestX)
                        )
    
    encoder = getEncoder()
    decoder = getDecoder()
    vae = createVAE(decoder, encoder)
    doTraining()
    

    Further reading