I'm creating some GPflow models in which I need the observations pre and post of a threshold x0
to be independent a priori. I could achieve this with just GP models, or with a ChangePoints kernel with infinite steepness, but both solutions don't work well with my future extensions in mind (MOGP in particular).
I figured I could easily construct what I want from scratch, so I made a new Combination kernel object, which uses the appropriate child kernel pre- or post x0
. This works as intended when I evaluate the kernel on a set of input points; the expected correlations between points before and after threshold are zero, and the rest is determined by the children kernels:
import numpy as np
import gpflow
from gpflow.kernels import Matern32
import matplotlib.pyplot as plt
import tensorflow as tf
from gpflow.kernels import Combination
class IndependentKernel(Combination):
def __init__(self, kernels, x0, forcing_variable=0, name=None):
self.x0 = x0
self.forcing_variable = forcing_variable
super().__init__(kernels, name=name)
def K(self, X, X2=None):
# threshold X, X2 based on self.x0, and construct a joint tensor
if X2 is None:
X2 = X
fv = self.forcing_variable
mask = tf.dtypes.cast(X[:, fv] >= self.x0, tf.int32)
X_partitioned = tf.dynamic_partition(X, mask, 2)
X2_partitioned = tf.dynamic_partition(X2, mask, 2)
K_pre = self.kernels[0].K(X_partitioned[0], X2_partitioned[0])
K_post = self.kernels[1].K(X_partitioned[1], X2_partitioned[1])
zero_block_1 = tf.zeros([K_pre.shape[0], K_post.shape[1]], tf.float64)
zero_block_2 = tf.zeros([K_post.shape[0], K_pre.shape[1]], tf.float64)
upper_row = tf.concat([K_pre, zero_block_1], axis=1)
lower_row = tf.concat([zero_block_2, K_post], axis=1)
return tf.concat([upper_row, lower_row], axis=0)
#
def K_diag(self, X):
fv = self.forcing_variable
mask = tf.dtypes.cast(X[:, fv] >= self.x0, tf.int32)
X_partitioned = tf.dynamic_partition(X, mask, 2)
return tf.concat([self.kernels[0].K_diag(X_partitioned[0]),
self.kernels[1].K_diag(X_partitioned[1])],
axis=1)
#
#
def f(x):
return np.sin(6*(x-0.7))
x0 = 0.3
n = 100
x = np.linspace(0, 1, n)
sigma = 0.5
y = np.random.normal(loc=f(x), scale=sigma)
fv = 0
X = x[:, None]
kernel = IndependentKernel([Matern32(), Matern32()], x0=x0, name='indep')
x_pred = np.linspace(0, 1, 100)
K = kernel.K(x_pred[:, None]) # <- kernel is evaluated correctly here
However, when I want to train a GPflow model with this kernel, I receive the error message TypeError: Expected int32, got None of type 'NoneType' instead.
This appears to result from the sub-kernel matrices K_pre
and K_post
to be of size (None, 1)
, instead of the expected squares (which they correctly are if I evaluate the kernel 'manually').
m = gpflow.models.GPR(data=(X, y[:, None]), kernel=kernel)
gpflow.optimizers.Scipy().minimize(m.training_loss,
m.trainable_variables,
options=dict(maxiter=10000),
method="L-BFGS-B") # <- K_pre & K_post are of size (None, 1) now?
What can I do to make the kernel properly trainable?
I am using GPflow 2.1.3 and TensorFlow 2.4.1.
this is not a GPflow issue but a subtlety of TensorFlow's eager vs graph mode: In eager mode (which is the default behaviour when you interact with tensors "manually" as in calling the kernel) K_pre.shape
works just as expected. In graph mode (which is what happens when you wrap code in tf.function()
, this generally does not always work (e.g. the shape might depend on tf.Variables with None shapes), and you have to use tf.shape(K_pre)
instead to obtain the dynamic shape (that depends on the actual values inside the variables). GPflow's Scipy class by default wraps the loss&gradient computation inside tf.function()
to speed up optimization. If you explicitly turn this off by passing compile=False
to the minimize() call, your code example runs fine. If you replace the .shape
attributes with tf.shape()
calls to fix it properly, it likewise will run fine.