I am working on a Capsule Network implementation that should be customizable. I found a code that is pretty straightforward (https://towardsdatascience.com/implementing-capsule-network-in-tensorflow-11e4cca5ecae). I used the code and changed it to my needs.
However, I the code does not score the same accuracy on a test dataset (MNIST) as other implementations and the paper "Dynamic Routing between Capsules" suggest. Is there a possible mistake in the implementation of the capsule network? The code uses tf subclassing to create the CapsNet model. Heres the class of the model:
import tensorflow as tf
import numpy as np
class CapsuleNetwork(tf.keras.Model):
def __init__(self, input_dimension, no_channels, no_of_convs, kernel_sizes, conv_strides, no_of_conv_kernels, no_of_primary_capsules, primary_capsule_vector, no_of_secondary_capsules, secondary_capsule_vector, r):
super(CapsuleNetwork, self).__init__()
self.input_dimension = input_dimension
self.no_channels = no_channels
self.no_of_convs = no_of_convs
self.kernel_sizes = kernel_sizes
self.conv_strides = conv_strides
self.no_of_conv_kernels = no_of_conv_kernels
self.no_of_primary_capsules = no_of_primary_capsules
self.primary_capsule_vector = primary_capsule_vector
self.no_of_secondary_capsules = no_of_secondary_capsules
self.secondary_capsule_vector = secondary_capsule_vector
self.r = r
self.epsilon = 10e-7
# calculating the number of primary capsules based on input image size and kernels/strides of the convolution (1152 in the paper)
for i in range(self.no_of_convs):
if i == 0:
self.feature_map_dim = int(((input_dimension-self.kernel_sizes[i])/self.conv_strides[i])+1)
else:
self.feature_map_dim = int(((self.feature_map_dim-self.kernel_sizes[i])/self.conv_strides[i])+1)
self.feature_map_dim = int(((self.feature_map_dim-self.kernel_sizes[-1])/self.conv_strides[-1])+1)
self.no_primary_caps = int(self.feature_map_dim*self.feature_map_dim*self.no_of_primary_capsules)
with tf.name_scope("Variables") as scope:
self.conv_dict = {}
for i in range(no_of_convs):
self.conv_dict[f"convolution{i}"] = tf.keras.layers.Conv2D(self.no_of_conv_kernels, [self.kernel_sizes[i],self.kernel_sizes[i]], strides=[self.conv_strides[i],self.conv_strides[i]], name=f'ConvolutionLayer_{i}', activation='relu')
self.primary_capsule = tf.keras.layers.Conv2D(self.no_of_primary_capsules * self.primary_capsule_vector, [self.kernel_sizes[-1],self.kernel_sizes[-1]], strides=[self.conv_strides[-1],self.conv_strides[-1]], name="PrimaryCapsule")
self.w = tf.Variable(tf.random_normal_initializer()(shape=[1, self.no_primary_caps, self.no_of_secondary_capsules, self.secondary_capsule_vector, self.primary_capsule_vector]), dtype=tf.float32, name="PoseEstimation", trainable=True)
self.dense_1 = tf.keras.layers.Dense(units = 512, activation='relu')
self.dense_2 = tf.keras.layers.Dense(units = 1024, activation='relu')
self.dense_3 = tf.keras.layers.Dense(units = input_dimension*input_dimension*no_channels, activation='sigmoid', dtype='float32')
def print_architecture(self):
print("Model Summary:\n")
print("Input images: " + str(self.input_dimension) + "x" + str(self.input_dimension) + "x" + str(self.no_channels))
print(str(self.no_of_convs) + " conv layers")
print(str(self.no_of_primary_capsules) + " primary capsule channels with " + str(self.no_primary_caps) + " " + str(self.primary_capsule_vector) + "-D capsules")
print(str(self.no_of_secondary_capsules) + " " + str(self.secondary_capsule_vector) + "-D digit capsules")
def build(self, input_shape):
pass
def squash(self, s):
with tf.name_scope("SquashFunction") as scope:
s_norm = tf.norm(s, axis=-1, keepdims=True)
return tf.square(s_norm)/(1 + tf.square(s_norm)) * s/(s_norm + self.epsilon)
@tf.function
def call(self, inputs):
input_x, y = inputs
for i in range(self.no_of_convs):
convolution = self.conv_dict.get(f"convolution{i}")
if i == 0:
x = convolution(input_x) # x.shape: (None, 20, 20, 256) JB:stimmt nun nicht mehr zwangsläufig
else:
x = convolution(x)
x = self.primary_capsule(x) # x.shape: (None, 6, 6, 256)
with tf.name_scope("CapsuleFormation") as scope:
u = tf.reshape(x, (-1, self.no_of_primary_capsules * x.shape[1] * x.shape[2], self.primary_capsule_vector)) # u.shape: (None, 1152, 8)
u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
u_hat = tf.matmul(self.w, u) # u_hat.shape: (None, 1152, 10, 16, 1)
u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)
with tf.name_scope("DynamicRouting") as scope:
b = tf.zeros((input_x.shape[0], self.no_primary_caps, self.no_of_secondary_capsules, 1)) # b.shape: (None, 1152, 10, 1)
for i in range(self.r): # self.r = 3
c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
v = self.squash(s) # v.shape: (None, 1, 10, 16)
agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
# Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
# u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
# v.shape (Intermediate shape): (None, 1, 10, 16, 1)
# Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
# Now matmul is performed in the last two dimensions, and others are broadcasted
# Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
b += agreement
with tf.name_scope("Masking") as scope:
y = tf.expand_dims(y, axis=-1) # y.shape: (None, 10, 1)
y = tf.expand_dims(y, axis=1) # y.shape: (None, 1, 10, 1)
mask = tf.cast(y, dtype=tf.float32) # mask.shape: (None, 1, 10, 1)
v_masked = tf.multiply(mask, v) # v_masked.shape: (None, 1, 10, 16)
with tf.name_scope("Reconstruction") as scope:
v_ = tf.reshape(v_masked, [-1, self.no_of_secondary_capsules * self.secondary_capsule_vector]) # v_.shape: (None, 160)
reconstructed_image = self.dense_1(v_) # reconstructed_image.shape: (None, 512)
reconstructed_image = self.dense_2(reconstructed_image) # reconstructed_image.shape: (None, 1024)
reconstructed_image = self.dense_3(reconstructed_image) # reconstructed_image.shape: (None, 784)
return v, reconstructed_image
@tf.function
def predict_capsule_output(self, inputs):
for i in range(self.no_of_convs):
convolution = self.conv_dict.get(f"convolution{i}")
if i == 0:
x = convolution(inputs) # x.shape: (None, 20, 20, 256) JB:stimmt nun nicht mehr zwangsläufig
else:
x = convolution(x)
x = self.primary_capsule(x) # x.shape: (None, 6, 6, 256)
with tf.name_scope("CapsuleFormation") as scope:
u = tf.reshape(x, (-1, self.no_of_primary_capsules * x.shape[1] * x.shape[2], self.primary_capsule_vector)) # u.shape: (None, 1152, 8)
u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
u_hat = tf.matmul(self.w, u) # u_hat.shape: (None, 1152, 10, 16, 1)
u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)
with tf.name_scope("DynamicRouting") as scope:
b = tf.zeros((inputs.shape[0], self.no_primary_caps, self.no_of_secondary_capsules, 1)) # b.shape: (None, 1152, 10, 1)
for i in range(self.r): # self.r = 3
c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
v = self.squash(s) # v.shape: (None, 1, 10, 16)
agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
# Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
# u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
# v.shape (Intermediate shape): (None, 1, 10, 16, 1)
# Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
# Now matmul is performed in the last two dimensions, and others are broadcasted
# Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
b += agreement
return v
@tf.function
def regenerate_image(self, inputs):
v, pred_class = inputs
with tf.name_scope("Reconstruction") as scope:
v_ = tf.reshape(v, [-1, self.no_of_secondary_capsules * self.secondary_capsule_vector]) # v_.shape: (None, 160)
reconstructed_image = self.dense_1(v_) # reconstructed_image.shape: (None, 512)
reconstructed_image = self.dense_2(reconstructed_image) # reconstructed_image.shape: (None, 1024)
reconstructed_image = self.dense_3(reconstructed_image) # reconstructed_image.shape: (None, 3072)
return reconstructed_image
There is an additional script that makes use of this class and trains a model. The main parts are:
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from datetime import datetime
import matplotlib
import matplotlib.pyplot as plt
from CapsuleNetworkClassWorkingExample import CapsuleNetwork
print(tf.__version__)
# =============================================================================
# Hyperparameter based on paper
epsilon = 1e-7
m_plus = 0.9
m_minus = 0.1
lambda_ = 0.5
alpha = 0.0005
epochs = 50
no_of_secondary_capsules = 10
batch_size = 64
optimizer = tf.keras.optimizers.Adam()
# parameters for CapsNet architecture
params = {
"input_dimension": 28,
"no_channels": 1, # 3 for rgb or 1 for gray-scale images
"no_of_convs": 1,
"kernel_sizes": (9,9), # kernel sizes of convolutional layers. First entry: kernel of conv0; second entry: kernel of conv1 etc...
"conv_strides": (1,2), # strides of the convolutional layers. see "kernel_sizes"
"no_of_conv_kernels": 256, # number of kernels for all conv layers (including primary capsule convolution layer)
"no_of_primary_capsules": 32, # number of primary capsule channels
"no_of_secondary_capsules": 10, # number of digit capsules (has to be equal to number of classes)
"primary_capsule_vector": 8, # dimension of primary capsules
"secondary_capsule_vector": 16, # dimension of digit capsules
"r":3, # number of routing iterations
}
# =============================================================================
# Save directory for np arrays (losses & acc)
loss_acc_path = './logs/losses_acc'
# Tensorboard Checkpoints
checkpoint_path = './logs/model/capsule'
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = './logs/func/%s' % stamp
writer = tf.summary.create_file_writer(logdir)
scalar_logdir = './logs/scalars/%s' % stamp
file_writer = tf.summary.create_file_writer(scalar_logdir + "/metrics")
# Import MNIST and divide into train, val and test dataset [left out here]
# functions to evaluate loss, and train the model
def safe_norm(v, axis=-1, epsilon=1e-7):
""""
input:
v → digit capsule output of dimension (None, 1, 10, 16) dimension varies for number of labels
axis → integer value
epsilon → needed to not receive NaN
output:
normalization of input v
"""
v_ = tf.reduce_sum(tf.square(v), axis = axis, keepdims=True)
return tf.sqrt(v_ + epsilon)
def loss_function(v, reconstructed_image, y, y_image):
prediction = safe_norm(v)
prediction = tf.reshape(prediction, [-1, no_of_secondary_capsules])
left_margin = tf.square(tf.maximum(0.0, m_plus - prediction))
right_margin = tf.square(tf.maximum(0.0, prediction - m_minus))
l = tf.add(y * left_margin, lambda_ * (1.0 - y) * right_margin)
margin_loss = tf.reduce_mean(tf.reduce_sum(l, axis=-1))
y_image_flat = tf.reshape(y_image, [-1,
params.get("input_dimension")*params.get("input_dimension")*params.get("no_channels")])
#reshape depends on input shape
reconstruction_loss = tf.reduce_mean(tf.square(y_image_flat - reconstructed_image))
loss = tf.add(margin_loss, alpha * reconstruction_loss)
return loss
def train(x,y):
y_one_hot = tf.one_hot(y, depth=10)
with tf.GradientTape() as tape:
v, reconstructed_image = model([x, y_one_hot])
loss = loss_function(v, reconstructed_image, y_one_hot, x)
grad = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grad, model.trainable_variables))
return loss
def predict(model, x):
pred = safe_norm(model.predict_capsule_output(x))
pred = tf.squeeze(pred, [1])
return np.argmax(pred, axis=1)[:,0]
tf.summary.trace_on(graph=True, profiler=True)
# create the model
model = CapsuleNetwork(**params)
model.print_architecture()
_ = train(X_train[:32],y_train[:32])
tf.summary.trace_off()
model.summary()
checkpoint = tf.train.Checkpoint(model=model)
train_losses = []
val_losses = []
val_accuracy = []
for i in range(1, epochs+1, 1):
loss = 0
with tqdm(total=len(train_dataset)) as pbar:
description = "Epoch " + str(i) + "/" + str(epochs)
pbar.set_description_str(description)
for X_batch, y_batch in train_dataset:
loss += train(X_batch,y_batch)
pbar.update(1)
loss /= len(train_dataset)
train_losses.append(loss.numpy())
training_sum = 0
print_statement = "Training Loss :" + str(loss.numpy()) + " Evaluating Validation Loss and Accuracy ..."
pbar.set_postfix_str(print_statement)
epoch_val_loss = []
for X_batch, y_batch in val_dataset:
training_sum += sum(predict(model, X_batch)==y_batch.numpy())
y_one_hot = tf.one_hot(y_batch, depth=10)
v = model.predict_capsule_output(X_batch)
pred_class = predict(model,X_batch)
reconstruction = model.regenerate_image((v,pred_class))
val_loss = loss_function(v,reconstruction,y_one_hot,X_batch)
epoch_val_loss.append(val_loss)
val_accuracy.append(training_sum/val_dataset_size)
val_losses.append(np.mean(epoch_val_loss))
with file_writer.as_default():
tf.summary.scalar('Training Loss', data=loss.numpy(), step=i)
tf.summary.scalar(' Validation Accuracy', data=val_accuracy[-1], step=i)
tf.summary.scalar(' Validation Loss', data=val_losses[-1], step=i)
print_statement = "Train Loss :" + str(loss.numpy()) + "Val Accuracy :" + str(val_accuracy[-1]) + "Val Loss :" + str(val_losses[-1])
if i != 1:
if train_losses[i-1] < np.amin(train_losses[:-1]):
print_statement += ' Checkpoint Saved'
checkpoint.save(checkpoint_path)
else:
print_statement += ' Checkpoint Saved'
checkpoint.save(checkpoint_path)
pbar.set_postfix_str(print_statement)
Now, when I evaluate the test accuracy by predicting the class outputs of the test datasets I obtain a accuracy of ~98% (depending on the checkpoint). This is roughly 1% less than other implementations achieve. I don't really understand why this is the case. Below, you can see the learning curves of the model. I hope someone can give a hint why my results might differ from other peoples codes. Learning Curves
While not having looked at your code in detail 1% difference is really not a lot when working with deep learning. The difference might be cause by a different (random) weight initialisation or slightly different gradients that lead to a different learning trajectory. Re-training the network might thus lead to slightly different results each time.