I am trying to reproduce the incomplete convolution layer with a linear profile (gamma) proposed in this paper, where a operation called incomplete dot product (IDP) is introduced, which decides the percentage of the input channels to be used for inference. I customised the incomplete convolutional layer as below:
class IncompleteConv2D(layers.Conv2D):
def __init__(self, IDP=1, **kwargs):
self.IDP = IDP
super(IncompleteConv2D, self).__init__(**kwargs)
def call(self, inputs, training=None):
def linear_profile(N, training):
g_list = []
num_channels_in_used = []
if training != False:
for i in range(1, N+1):
g_list.append(1 - i/N)
g = tf.stack([g_list])
return g, num_channels_in_used
num_channels_in_used = np.floor(N*self.IDP).astype(np.int32)
if num_channels_in_used == 0:
for i in range(1, N+1):
g_list.append(1 - i/N)
else:
for i in range(1, num_channels_in_used+1):
g_list.append(1 - i/N)
g = tf.stack([g_list])
return g, num_channels_in_used
num_channels_in = inputs.shape[-1]
num_channels_out = self.kernel.shape[-1]
gamma, num_channels_in_used = linear_profile(num_channels_in, training)
g_converted_list = []
if training != False:
for j in range(num_channels_out):
g_converted = tf.reshape(
tf.broadcast_to(gamma, [self.kernel[:,:,:,j].shape[0]*self.kernel[:,:,:,j].shape[1], self.kernel[:,:,:,j].shape[2]]),
tf.shape(self.kernel[:,:,:,j])
)
g_converted = tf.expand_dims(g_converted, 3)
g_converted_list.append(g_converted)
else:
for j in range(num_channels_out):
if (num_channels_in_used == 0):
g_converted = tf.reshape(
tf.broadcast_to(gamma, [self.kernel[:,:,:,j].shape[0]*self.kernel[:,:,:,j].shape[1], self.kernel[:,:,:,j].shape[2]]),
tf.shape(self.kernel[:,:,:,j])
)
else:
g_converted = tf.reshape(
tf.broadcast_to(gamma, [self.kernel[:,:,:,j].shape[0]*self.kernel[:,:,:,j].shape[1], num_channels_in_used]),
[self.kernel[:,:,:,j].shape[0], self.kernel[:,:,:,j].shape[1], num_channels_in_used]
)
g_converted = tf.expand_dims(g_converted, 3)
g_converted_list.append(g_converted)
if (num_channels_in_used > 0):
self.kernel = self.kernel[:,:,:num_channels_in_used,:]
inputs = inputs[:,:,:,:num_channels_in_used]
gamma_converted = tf.concat(g_converted_list, 3)
mean, var = tf.nn.moments(gamma_converted * self.kernel, axes=[0, 1, 2], keepdims=True)
result = self.convolution_op(
inputs, (gamma_converted * self.kernel - mean) / tf.sqrt(var + 1e-10)
)
if self.use_bias:
result = result + self.bias
return result
Basically, a linear profile gamma is learned during training, and IDP between 0 and 1 should be set during inference. In other words, IDP should remain 1 at all times during training. With this IncompleteConv2D
layer defined, an example model initialisation is as below:
x = keras.Input(shape=input_shape)
output = IncompleteConv2D(filters=32, kernel_size=(3, 3), activation="relu", IDP=1)(x)
output = layers.MaxPooling2D(pool_size=(2, 2))(output)
output = IncompleteConv2D(filters=64, kernel_size=(3, 3), activation="relu", IDP=1)(output)
output = layers.MaxPooling2D(pool_size=(2, 2))(output)
output = layers.Flatten()(output)
output = layers.Dropout(0.5)(output)
output = layers.Dense(10, activation="softmax")(output)
model = Model(x, output)
model.summary()
where IDP=1 is set as a parameter for the layer (which I believe is not the best way, since IDP is only related to the inference phase...) Note that IDP has to be set as 1 here because for the testing part of the training phase all channels should be utilised.
After compilation, the code for training phase is as below:
model_history = model.fit(training_generator,
validation_data=validation_generator,
verbose=1,
epochs=1)
where the validation accuracy during training phase is evaluated with training=False within the call method for the IncompleteConv2D layer, with IDP=1.
Now comes the tricky part, for evaluation:
test_loss, test_accuracy = model.evaluate(X_test_data, y_test, IDP=0.8, verbose=1)
An IDP argument between 0 and 1 should be fed into the evaluation phase. However, IDP is not recognised.
TypeError: Invalid keyword arguments: ['IDP']
Problem: I looked into customising the model.evaluate() here using test_step method of a CustomModel, but it seemed that you cannot add your own argument for model.evaluate().
If this can be solved, I expect that during model.evaluate(IDP=0.8), this new IDP will be passed in the call method during forward pass for inference, thus overriding the original IDP (which was set to 1).
Is there any way I can achieve this? Any help is highly appreciated.
Updated: Following the useful pointers from @Innat, I inherited the keras.Model class with IDP initialised as follow:
class CustomModel(keras.Model):
def __init__(self, IDP=1, **kwargs):
super(CustomModel, self).__init__(**kwargs)
self.IDP = tf.Variable(IDP, dtype=tf.float32)
model = CustomModel(inputs=x, outputs=output, IDP=1)
With the help of a custom callback, I managed to change the IDP attribute in the CustomModel class to 0.8 at the beginning of model.evaluate()
.
class CustomCallback(keras.callbacks.Callback):
def on_test_begin(self, logs=None):
# self.model.IDP.assign(0.8)
for layer in self.model.layers:
if isinstance(layer, IncompleteConv2D):
layer.IDP = 0.8
test_loss, test_accuracy = model.evaluate(X_test_data, y_test, verbose=1, callbacks=[CustomCallback()])
However, from the observation that the test accuracy is similar to the validation accuracy, I believe the IDP attribute in the IncompleteConv2D
layer class remained 1 for model.evaluate()
.
What should I do next to link IDP in CustomModel to IDP in IncompleteConv2D
, as in, how to pass a model attribute to its layer?
In essence, you've created a custom layer (IncompleteConv2D
) and initialize a variable of a scaler value which further is used in call
method. Now, you want to change this value in testing time or inference time. In keras
, one way to achieve this by using the callback API. Here I'm showing a general solutions, and not using your IncompleteConv2D
layer.
Let's import some necessary libraries and define a simple custom layer. Here in the call
method, we will print out the predefined scaler value.
import tensorflow as tf
import tensorflow.keras as keras
import keras.layers as layers
import numpy as np
class StandardizedConv2DWithCall(layers.Conv2D):
def __init__(self, some_var, *args, **kwargs):
super().__init__(*args, **kwargs)
self.some_var = tf.Variable(
some_var, dtype=tf.float32, trainable=False
)
def call(self, inputs):
print('printing from call fn: ', self.some_var)
mean, var = tf.nn.moments(
self.kernel, axes=[0, 1, 2], keepdims=True
)
result = self.convolution_op(
inputs, (self.kernel - mean) / tf.sqrt(var + 1e-10)
)
if self.use_bias:
result = result + self.bias
return result
Prepare some data to train and test.
# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
def prepare_ds(x, y, take=None):
if take:
x, y = x[:take], y[:take]
x = tf.expand_dims(tf.cast(x, dtype=tf.float32), axis=3)
x = tf.repeat(x, repeats=3, axis=3)
x = tf.divide(x, 255.)
x = tf.image.resize(x, [28, 28])
y = tf.one_hot(y, depth=10)
return x, y
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, y_train = prepare_ds(x_train, y_train, take=100)
x_test, y_test = prepare_ds(x_test, y_test, take=50)
x_train.shape, y_train.shape, x_test.shape, y_test.shape
(TensorShape([100, 28, 28, 3]),
TensorShape([100, 10]),
TensorShape([50, 28, 28, 3]),
TensorShape([50, 10]))
Let's define a dummy model, which contains the above custom layer.
model = keras.Sequential(
[
keras.layers.InputLayer(input_shape=(28,28,3)),
StandardizedConv2DWithCall(
some_var= 1., filters=32, kernel_size=(3, 3), activation="relu"
),
layers.MaxPooling2D(pool_size=(2, 2)),
StandardizedConv2DWithCall(
some_var=1., filters=64, kernel_size=(3, 3), activation="relu"
),
layers.MaxPooling2D(pool_size=(2, 2)),
layers.Flatten(),
layers.Dense(num_classes, activation="softmax"),
]
)
model.summary()
Run the model with eager model to see the print value of the above custom layer.
batch_size = 128
model.compile(
loss="categorical_crossentropy",
optimizer="adam",
metrics=["accuracy"],
run_eagerly=True
)
model.fit(x_train, y_train, batch_size=batch_size, epochs=1)
printing from call fn: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
printing from call fn: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
1/1 [==============================] - 0s 179ms/step - loss: 40.5805 - accuracy: 0.0400
<keras.callbacks.History at 0x7f653355bd00>
Printed value 1.0
as set. Now, let's define a custom callback, which will be used to change this value in testing time.
class UpdateSomeVariableCallback(keras.callbacks.Callback):
def __init__(self, update_value):
super().__init__()
self.update_value = update_value
def on_test_begin(self, logs=None):
for layer in self.model.layers:
if isinstance(layer, StandardizedConv2DWithCall):
layer.some_var.assign(self.update_value)
Run the test
# test 1
loss, acc = model.evaluate(
x_test,
y_test,
callbacks=[
UpdateSomeVariableCallback(update_value=0.8)
]
)
printing from call fn: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=0.8>
2/2 [==============================] - 0s 41ms/step - loss: 26.0635 - accuracy: 0.3600
# test 2
loss, acc = model.evaluate(
x_test,
y_test,
callbacks=[
UpdateSomeVariableCallback(update_value=0.2)
]
)
printing from call fn: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=0.2>
2/2 [==============================] - 0s 31ms/step - loss: 26.0635 - accuracy: 0.3600
As we can see that, the variable is updated and printed accordingly in the call method.