I am implementing a (variational) autoencoder (VAE) in Keras following this guide.
The VAE reads data from topomaps
folder and labels from labels
folder. They both contain .npy
files, which are essentially numpy arrays stored on disk.
I split data set into 80% training data and 20% test data. Specifically:
shape is (245760, 40, 40, 1)
shape is (245760,)
shape is (61440, 40, 40, 1)
shape is (61440,)
Unfortunately even though the (training) loss
and val_loss
are pretty low (2.9246e-04
and -4.8249e-04
, respectively), if I visually check the "reconstruction skills" of my VAE, I can notice they are poor since reconstructed image is not similar at all to the original one:
I ran the demo using this configuration:
. This value must not be changedepochs=2
I know epochs
is very small, but it is only a demo. I aim to enlarge it when I figure out why my VAE won't work even though loss is very low.
This is the run output:
Epoch 1/2
2023-08-30 11:35:49.408811: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - ETA: 0s - loss: 60.0042 - reconstruction_loss: 11.0072 - kl_loss: 0.09892023-08-30 11:38:40.538661: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - 190s 492ms/step - loss: 59.8772 - reconstruction_loss: 11.0072 - kl_loss: 0.0989 - val_loss: 5.5495e-04 - val_reconstruction_loss: 5.4875e-04 - val_kl_loss: 6.1989e-06
Epoch 2/2
384/384 [==============================] - 188s 490ms/step - loss: 3.0879e-04 - reconstruction_loss: 3.0472e-04 - kl_loss: 1.5222e-06 - val_loss: 3.4318e-04 - val_reconstruction_loss: 3.4303e-04 - val_kl_loss: 1.4901e-07
2023-08-30 11:42:17.049 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:26.493 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:37.200 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:41.800433: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
1920/1920 [==============================] - 29s 15ms/step
2023-08-30 11:43:16.276 Python[2419:58392] +[CATransaction synchronize] called within transaction
Process finished with exit code 0
Learning curve:
These are three model classes:
class VAE(keras.Model):
def __init__(self, encoder, decoder, **kwargs):
self.encoder = encoder
self.decoder = decoder
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
def call(self, inputs, training=None, mask=None):
_, _, z = self.encoder(inputs)
outputs = self.decoder(z)
return outputs
def metrics(self):
return [
def train_step(self, data):
with tf.GradientTape() as tape:
# Forward pass
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Compute losses
reconstruction_loss = tf.reduce_mean(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
# Compute gradient
grads = tape.gradient(total_loss, self.trainable_weights)
# Update weights
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
# Update my own metrics
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
def test_step(self, data):
# Forward pass
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Compute losses
reconstruction_loss = tf.reduce_mean(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
# Update my own metrics
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
class Encoder(keras.Model):
def __init__(self, latent_dimension, input_shape):
super(Encoder, self).__init__()
self.latent_dim = latent_dimension
self.conv_block1 = keras.Sequential([
layers.Conv2D(filters=64, kernel_size=3, activation="relu", strides=2, padding="same"),
self.conv_block2 = keras.Sequential([
layers.Conv2D(filters=128, kernel_size=3, activation="relu", strides=2, padding="same"),
self.conv_block3 = keras.Sequential([
layers.Conv2D(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
self.flatten = layers.Flatten()
self.dense = layers.Dense(units=100, activation="relu")
self.z_mean = layers.Dense(latent_dimension, name="z_mean")
self.z_log_var = layers.Dense(latent_dimension, name="z_log_var")
self.sampling = sample
def call(self, inputs, training=None, mask=None):
x = self.conv_block1(inputs)
x = self.conv_block2(x)
x = self.conv_block3(x)
x = self.flatten(x)
x = self.dense(x)
z_mean = self.z_mean(x)
z_log_var = self.z_log_var(x)
z = self.sampling(z_mean, z_log_var)
return z_mean, z_log_var, z
class Decoder(keras.Model):
def __init__(self, latent_dimension):
super(Decoder, self).__init__()
self.latent_dim = latent_dimension
self.dense1 = keras.Sequential([
layers.Dense(units=100, activation="relu"),
self.dense2 = keras.Sequential([
layers.Dense(units=1024, activation="relu"),
self.dense3 = keras.Sequential([
layers.Dense(units=4096, activation="relu"),
self.reshape = layers.Reshape((4, 4, 256))
self.deconv1 = keras.Sequential([
layers.Conv2DTranspose(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
self.deconv2 = keras.Sequential([
layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=1, padding="same"),
self.deconv3 = keras.Sequential([
layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=2, padding="valid"),
self.deconv4 = keras.Sequential([
layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=1, padding="valid"),
self.deconv5 = keras.Sequential([
layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=2, padding="valid"),
self.deconv6 = layers.Conv2DTranspose(filters=1, kernel_size=2, activation="sigmoid", padding="valid")
def call(self, inputs, training=None, mask=None):
x = self.dense1(inputs)
x = self.dense2(x)
x = self.dense3(x)
x = self.reshape(x)
x = self.deconv1(x)
x = self.deconv2(x)
x = self.deconv3(x)
x = self.deconv4(x)
x = self.deconv5(x)
decoder_outputs = self.deconv6(x)
return decoder_outputs
Note that the implementation of the VAE belongs to the guide I already posted.
This is the main function:
if __name__ == '__main__':
# Load data
x_train, x_test, y_train, y_test = load_data("topomaps", "labels", 0.2)
# Expand dimensions to (None, 40, 40, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
# Print data shapes
print("x_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)
print("x_test shape:", x_test.shape)
print("y_test shape:", y_test.shape)
# Normalize the data
x_train = x_train.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0
# Compiling the VAE
latent_dimension = 25 # Do not change
encoder = Encoder(latent_dimension, (40, 40, 1))
decoder = Decoder(latent_dimension)
vae = VAE(encoder, decoder)
# Training
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)
print("x_val shape:", x_val.shape)
print("y_val shape:", y_val.shape)
epochs = 2
batch_size = 512
history = vae.fit(x_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val,))
# Plot learning curves
plot_metric(history, "loss")
# Check reconstruction skills against a random test sample
image_index = 5
plt.title(f"Original image {image_index}")
original_image = x_test[image_index]
plt.imshow(original_image, cmap="gray")
plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
plt.imshow(reconstructed_image, cmap="gray")
These are some functions I used:
def load_data(topomaps_folder: str, labels_folder: str, test_size):
x, y = _create_dataset(topomaps_folder, labels_folder)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size)
return x_train, x_test, y_train, y_test
def _create_dataset(topomaps_folder, labels_folder):
topomaps_files = os.listdir(topomaps_folder)
labels_files = os.listdir(labels_folder)
x = []
y = []
n_files = len(topomaps_files)
for topomaps_file, labels_file in tqdm(zip(topomaps_files, labels_files), total=n_files, desc="Loading data set"):
topomaps_array = np.load(f"{topomaps_folder}/{topomaps_file}")
labels_array = np.load(f"{labels_folder}/{labels_file}")
if topomaps_array.shape[0] != labels_array.shape[0]:
raise Exception("Shapes must be equal")
for i in range(topomaps_array.shape[0]):
x = np.array(x)
y = np.array(y)
return x, y
def sample(z_mean, z_log_var):
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.random.normal(shape=(batch, dim))
stddev = tf.exp(0.5 * z_log_var)
return z_mean + stddev * epsilon
def plot_metric(history, metric):
plt.plot(history.history['val_' + metric])
plt.legend(['train', 'validation'])
- Could you try with much lower training set to see if it overfits and reconstructs a known image? Okay, this is what I did in main
# Reduce DS size
x_train = x_train[:500]
y_train = y_train[:500]
x_test = x_test[:500]
y_test = y_test[:500]
This is what I get:
and learning curve is:
If I run the same configuration but setting epochs=100
I get:
and loss:
- Not sure if you did it...but did you convert your output back to a non-normalized value when plotting? This is what I have done:
plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
reconstructed_image = reconstructed_image * 255
plt.imshow(reconstructed_image, cmap="gray")
But I still get:
- What does the loss graph look like when the batch size is 4 and learning rate is 0.0002 for 100 epochs? Reconstructed image is all black and loss curve is:
The problem concerned the data normalization.
x_train = (x_train - np.min(x_train)) / (np.max(x_train) - np.min(x_train))
x_test = (x_test - np.min(x_test)) / (np.max(x_test) - np.min(x_test))
should resolve the issue.