Search code examples
pythontensorflowkerasdeep-learningautoencoder

Debugging autoencoder training (loss is low but reconstructed image is all black)


I am implementing a (variational) autoencoder (VAE) in Keras following this guide.

The VAE reads data from topomaps folder and labels from labels folder. They both contain .npy files, which are essentially numpy arrays stored on disk.

I split data set into 80% training data and 20% test data. Specifically:

  • x_train shape is (245760, 40, 40, 1)
  • y_train shape is (245760,)
  • x_test shape is (61440, 40, 40, 1)
  • y_test shape is (61440,)

Unfortunately even though the (training) loss and val_loss are pretty low (2.9246e-04 and -4.8249e-04, respectively), if I visually check the "reconstruction skills" of my VAE, I can notice they are poor since reconstructed image is not similar at all to the original one:

original

reconstructed

I ran the demo using this configuration:

  • latent_dim=25. This value must not be changed
  • epochs=2
  • batch_size=512
  • optimizer=Adam()
  • learning_rate=0.001

I know epochs is very small, but it is only a demo. I aim to enlarge it when I figure out why my VAE won't work even though loss is very low.

This is the run output:

Epoch 1/2
2023-08-30 11:35:49.408811: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - ETA: 0s - loss: 60.0042 - reconstruction_loss: 11.0072 - kl_loss: 0.09892023-08-30 11:38:40.538661: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - 190s 492ms/step - loss: 59.8772 - reconstruction_loss: 11.0072 - kl_loss: 0.0989 - val_loss: 5.5495e-04 - val_reconstruction_loss: 5.4875e-04 - val_kl_loss: 6.1989e-06
Epoch 2/2
384/384 [==============================] - 188s 490ms/step - loss: 3.0879e-04 - reconstruction_loss: 3.0472e-04 - kl_loss: 1.5222e-06 - val_loss: 3.4318e-04 - val_reconstruction_loss: 3.4303e-04 - val_kl_loss: 1.4901e-07
2023-08-30 11:42:17.049 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:26.493 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:37.200 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:41.800433: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
1920/1920 [==============================] - 29s 15ms/step
2023-08-30 11:43:16.276 Python[2419:58392] +[CATransaction synchronize] called within transaction

Process finished with exit code 0

Learning curve:

loss learning curve

These are three model classes:

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    def call(self, inputs, training=None, mask=None):
        _, _, z = self.encoder(inputs)
        outputs = self.decoder(z)
        return outputs

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            # Forward pass
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)

            # Compute losses
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss

        # Compute gradient
        grads = tape.gradient(total_loss, self.trainable_weights)

        # Update weights
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        # Update my own metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        # Forward pass
        z_mean, z_log_var, z = self.encoder(data)
        reconstruction = self.decoder(z)

        # Compute losses
        reconstruction_loss = tf.reduce_mean(
            tf.reduce_sum(
                keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
            )
        )
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        total_loss = reconstruction_loss + kl_loss

        # Update my own metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }


class Encoder(keras.Model):
    def __init__(self, latent_dimension, input_shape):
        super(Encoder, self).__init__()
        self.latent_dim = latent_dimension
        self.conv_block1 = keras.Sequential([
            layers.Input(shape=input_shape),
            layers.Conv2D(filters=64, kernel_size=3, activation="relu", strides=2, padding="same"),
            layers.BatchNormalization()
        ])
        self.conv_block2 = keras.Sequential([
            layers.Conv2D(filters=128, kernel_size=3, activation="relu", strides=2, padding="same"),
            layers.BatchNormalization()
        ])
        self.conv_block3 = keras.Sequential([
            layers.Conv2D(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
            layers.BatchNormalization()
        ])
        self.flatten = layers.Flatten()
        self.dense = layers.Dense(units=100, activation="relu")
        self.z_mean = layers.Dense(latent_dimension, name="z_mean")
        self.z_log_var = layers.Dense(latent_dimension, name="z_log_var")
        self.sampling = sample

    def call(self, inputs, training=None, mask=None):
        x = self.conv_block1(inputs)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.flatten(x)
        x = self.dense(x)
        z_mean = self.z_mean(x)
        z_log_var = self.z_log_var(x)
        z = self.sampling(z_mean, z_log_var)
        return z_mean, z_log_var, z


class Decoder(keras.Model):
    def __init__(self, latent_dimension):
        super(Decoder, self).__init__()
        self.latent_dim = latent_dimension
        self.dense1 = keras.Sequential([
            layers.Dense(units=100, activation="relu"),
            layers.BatchNormalization()
        ])
        self.dense2 = keras.Sequential([
            layers.Dense(units=1024, activation="relu"),
            layers.BatchNormalization()
        ])
        self.dense3 = keras.Sequential([
            layers.Dense(units=4096, activation="relu"),
            layers.BatchNormalization()
        ])
        self.reshape = layers.Reshape((4, 4, 256))
        self.deconv1 = keras.Sequential([
            layers.Conv2DTranspose(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
            layers.BatchNormalization()
        ])
        self.deconv2 = keras.Sequential([
            layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=1, padding="same"),
            layers.BatchNormalization()
        ])
        self.deconv3 = keras.Sequential([
            layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=2, padding="valid"),
            layers.BatchNormalization()
        ])
        self.deconv4 = keras.Sequential([
            layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=1, padding="valid"),
            layers.BatchNormalization()
        ])
        self.deconv5 = keras.Sequential([
            layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=2, padding="valid"),
            layers.BatchNormalization()
        ])
        self.deconv6 = layers.Conv2DTranspose(filters=1, kernel_size=2, activation="sigmoid", padding="valid")

    def call(self, inputs, training=None, mask=None):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.dense3(x)
        x = self.reshape(x)
        x = self.deconv1(x)
        x = self.deconv2(x)
        x = self.deconv3(x)
        x = self.deconv4(x)
        x = self.deconv5(x)
        decoder_outputs = self.deconv6(x)
        return decoder_outputs

Note that the implementation of the VAE belongs to the guide I already posted.

This is the main function:

if __name__ == '__main__':
    # Load data
    x_train, x_test, y_train, y_test = load_data("topomaps", "labels", 0.2)

    # Expand dimensions to (None, 40, 40, 1)
    x_train = np.expand_dims(x_train, -1)
    x_test = np.expand_dims(x_test, -1)

    # Print data shapes
    print("x_train shape:", x_train.shape)
    print("y_train shape:", y_train.shape)
    print("x_test shape:", x_test.shape)
    print("y_test shape:", y_test.shape)

    # Normalize the data
    x_train = x_train.astype("float32") / 255.0
    x_test = x_test.astype("float32") / 255.0

    # Compiling the VAE
    latent_dimension = 25  # Do not change
    encoder = Encoder(latent_dimension, (40, 40, 1))
    decoder = Decoder(latent_dimension)
    vae = VAE(encoder, decoder)
    vae.compile(Adam(learning_rate=0.001))

    # Training
    x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)
    print("x_val shape:", x_val.shape)
    print("y_val shape:", y_val.shape)
    epochs = 2
    batch_size = 512
    history = vae.fit(x_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val,))

    # Plot learning curves
    plot_metric(history, "loss")

    # Check reconstruction skills against a random test sample
    image_index = 5
    plt.title(f"Original image {image_index}")
    original_image = x_test[image_index]
    plt.imshow(original_image, cmap="gray")
    plt.show()

    plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
              f"batch_size = {batch_size}")
    x_test_reconstructed = vae.predict(x_test)
    reconstructed_image = x_test_reconstructed[image_index]
    plt.imshow(reconstructed_image, cmap="gray")
    plt.show()

These are some functions I used:

def load_data(topomaps_folder: str, labels_folder: str, test_size):
    x, y = _create_dataset(topomaps_folder, labels_folder)
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size)
    return x_train, x_test, y_train, y_test

def _create_dataset(topomaps_folder, labels_folder):
    topomaps_files = os.listdir(topomaps_folder)
    labels_files = os.listdir(labels_folder)
    topomaps_files.sort()
    labels_files.sort()
    x = []
    y = []
    n_files = len(topomaps_files)
    for topomaps_file, labels_file in tqdm(zip(topomaps_files, labels_files), total=n_files, desc="Loading data set"):
        topomaps_array = np.load(f"{topomaps_folder}/{topomaps_file}")
        labels_array = np.load(f"{labels_folder}/{labels_file}")
        if topomaps_array.shape[0] != labels_array.shape[0]:
            raise Exception("Shapes must be equal")
        for i in range(topomaps_array.shape[0]):
            x.append(topomaps_array[i])
            y.append(labels_array[i])
    x = np.array(x)
    y = np.array(y)
    return x, y

def sample(z_mean, z_log_var):
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.random.normal(shape=(batch, dim))
    stddev = tf.exp(0.5 * z_log_var)
    return z_mean + stddev * epsilon

def plot_metric(history, metric):
    plt.plot(history.history[metric])
    plt.plot(history.history['val_' + metric])
    plt.title(metric)
    plt.ylabel(metric)
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'])
    plt.show()

EDIT

- Could you try with much lower training set to see if it overfits and reconstructs a known image? Okay, this is what I did in main function

# Reduce DS size
x_train = x_train[:500]
y_train = y_train[:500]
x_test = x_test[:500]
y_test = y_test[:500]

This is what I get:

original

reconstructed

and learning curve is:

curve

If I run the same configuration but setting epochs=100 I get:

original

reconstructed

and loss:

loss

- Not sure if you did it...but did you convert your output back to a non-normalized value when plotting? This is what I have done:

plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
          f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
reconstructed_image = reconstructed_image * 255
plt.imshow(reconstructed_image, cmap="gray")
plt.show()

But I still get:

rec

- What does the loss graph look like when the batch size is 4 and learning rate is 0.0002 for 100 epochs? Reconstructed image is all black and loss curve is:

loss


Solution

  • The problem concerned the data normalization.

    x_train = (x_train - np.min(x_train)) / (np.max(x_train) - np.min(x_train))
    

    and

    x_test = (x_test - np.min(x_test)) / (np.max(x_test) - np.min(x_test))
    

    should resolve the issue.