I am trying to optimise a set of polynomial coefficients in order to estimate the position of some generating sources in 3D space. I have an example working in 2D, but moving to realistic forward models seems to disconnect the initial_poly_coefficients
from the graph.
The gradients return None
for all of the coefficients and I get the error message
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/var/folders/j4/t1rwqcnx7cn64bpmnmjyywkm0000gr/T/ipykernel_58154/643982438.py in <module>
166
167 # Apply gradients
--> 168 optimizer.apply_gradients(zip(gradients, poly_coefficients))
169
170
~/anaconda3/envs/oslpy/lib/python3.8/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py in apply_gradients(self, grads_and_vars, name, experimental_aggregate_gradients)
628 RuntimeError: If called in a cross-replica context.
629 """
--> 630 grads_and_vars = optimizer_utils.filter_empty_gradients(grads_and_vars)
631 var_list = [v for (_, v) in grads_and_vars]
632
~/anaconda3/envs/oslpy/lib/python3.8/site-packages/tensorflow/python/keras/optimizer_v2/utils.py in filter_empty_gradients(grads_and_vars)
73
74 if not filtered:
---> 75 raise ValueError("No gradients provided for any variable: %s." %
76 ([v.name for _, v in grads_and_vars],))
77 if vars_with_empty_grads:
ValueError: No gradients provided for any variable: ['Variable:0', 'Variable:0', 'Variable:0', 'Variable:0', 'Variable:0'].
Debugging attempts
In my debugging attempts I have tried to run the code out of eager execution mode. In these cases TF tracks the gradients (print(tf.compat.v1.trainable_variables())
returns values, not just an empty list), but then I lose functionality between TF and numpy.
I was worried that the magnetic_dipole_fast_tf was not differentiable. I tested this with tf.test.compute_gradient
and the values were real. I did the same for generate_H
.
My hunch is that I am doing something along the lines of an assignment somewhere which is breaking the connection between the polynomial coefficients and the cost function. Any help would be hugely appreciated.
Here is my code:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()
# Simulated data
num_sources = 50
num_channels_x = 5
num_channels_y = 5
# Define the ground truth positions of the sources in 3D space (x, y, z coordinates)
source_positions_x = tf.cast(np.linspace(-1, 1, num_sources), dtype=tf.float32)
source_positions_y = tf.zeros_like(source_positions_x) # All sources at y=0
source_positions_z = tf.cast(-(source_positions_x ** 2) + source_positions_x -1.0, dtype=tf.float32)
# Specify the coefficients as constants
coefficients_values = [0.01, -1.2, 1.1, 0.0, 2.0]
# Create TensorFlow variables with these specific values
initial_poly_coefficients = [tf.Variable([value], dtype=tf.float32,trainable=True) for value in coefficients_values]
# Define the initial guess positions of the sources
initial_source_positions_x = tf.cast(np.linspace(-1, 1, num_sources), dtype=tf.float32)
initial_source_positions_y = tf.zeros_like(initial_source_positions_x) # Initial guess: all sources at z=0
initial_source_positions_z = tf.math.polyval(initial_poly_coefficients, initial_source_positions_x)
# Define the positions of the channels (on a 2D grid in the xy-plane above the sources)
channel_positions_x = np.linspace(-1, 1, num_channels_x)
channel_positions_y = np.linspace(-1, 1, num_channels_y)
channel_positions_z = tf.zeros((num_channels_x * num_channels_y,), dtype=tf.float32) # All channels at z=0
# Create 3D arrays for positions and orientations (in this example, orientations are along the z-axis)
source_positions = tf.stack([source_positions_x, source_positions_y, source_positions_z], axis=1)
source_orientations = tf.constant([[0.0, 0.0, 1.0]] * num_sources, dtype=tf.float32)
# Create the channel positions on the grid
channel_positions_x, channel_positions_y = np.meshgrid(channel_positions_x, channel_positions_y)
channel_positions_x = channel_positions_x.flatten()
channel_positions_y = channel_positions_y.flatten()
channel_positions = tf.stack([channel_positions_x, channel_positions_y, channel_positions_z], axis=1)
def magnetic_dipole_fast_tf(R, pos, ori):
"""
Calculate the leadfield for a magnetic dipole in an infinite medium using TensorFlow operations.
Parameters:
R (Tensor): Position of the magnetic dipole (1x3 Tensor).
pos (Tensor): Positions of magnetometers (Nx3 Tensor).
ori (Tensor): Orientations of magnetometers (Nx3 Tensor).
Returns:
lf (Tensor): Leadfield matrix (Nx3 Tensor).
# Example usage:
R = tf.constant([1.0, 2.0, 3.0], dtype=tf.float32)
pos = tf.constant([[4.0, 5.0, 6.0], [7.0, 8.0, 9.0]], dtype=tf.float32) # Example positions of magnetometers
ori = tf.constant([[0.0, 0.0, 1.0], [0.0, 1.0, 0.0]], dtype=tf.float32) # Example orientations of magnetometers
leadfield = magnetic_dipole_fast_tf(R, pos, ori)
"""
mu_0 = 4 * np.pi * 1e-7 # Permeability of free space
# Modify the shape of R to be a 2D tensor with one row
R = tf.reshape(R, (1, 3))
# Modify the shape of pos to be a 2D tensor with one row per magnetometer
pos = tf.reshape(pos, (-1, 3))
# Calculate the distance vectors between the dipole and all magnetometers
r = pos - R
# Calculate the distance from the dipole to each magnetometer
distance = tf.norm(r, axis=1)
# Calculate the magnetic field contribution for all magnetometers
dot_product = tf.reduce_sum(ori * r, axis=1)
r_squared = distance**2
lf = (mu_0 / (4 * np.pi)) * (3 * dot_product[:, tf.newaxis] * r / r_squared[:, tf.newaxis]**(5/2) - ori / distance[:, tf.newaxis]**3)
return lf
def generate_H(channel_positions, source_positions, source_orientations):
num_channels = channel_positions.shape[0]
num_sources = source_positions.shape[0]
H = np.zeros((num_channels, num_sources, 3), dtype=np.float32) # Initialize the leadfield matrix
for i in range(num_channels):
for j in range(num_sources):
channel_position = channel_positions[i]
source_position = source_positions[j]
source_orientation = source_orientations[j]
# Calculate the leadfield for the current source and channel
lf = magnetic_dipole_fast_tf(source_position, channel_position, source_orientation)
# Store the leadfield in the matrix
H[i, j, :] = lf
H = tf.convert_to_tensor(H, dtype=tf.float32)
return H
# Generating the true H matrix based on initial source positions
H_true = generate_H(channel_positions, source_positions, source_orientations)
# Just take the first axis - assume the source orientation is known
H_true = H_true[:,:,0]
num_channels = channel_positions.shape[0]
C_X = np.eye(num_sources)
C_n = np.eye(num_channels) * 0.01
C_Y = np.dot(np.dot(H_true, C_X), tf.transpose(H_true)) + C_n
# Convert matrices to TensorFlow constants
C_X_tf = tf.constant(C_X, dtype=tf.float32)
C_n_tf = tf.constant(C_n, dtype=tf.float32)
C_Y_tf = tf.constant(C_Y, dtype=tf.float32)
# Define the optimization process
learning_rate = 0.1
optimizer = tf.optimizers.Adam(learning_rate)
poly_coefficients = initial_poly_coefficients
# Optimization loop
num_iterations = 1800
tolerance = 1e-9
for iteration in range(num_iterations):
with tf.GradientTape() as tape:
# Generate H matrix based on fixed x-coordinates and make an estimate for the z coordinate. The
# y coordinate remains fixed, too.
source_positions_z_tf = tf.math.polyval(poly_coefficients, source_positions_x)
# Now we need to stack the source positions into a tensor:
source_positions = tf.stack([source_positions_x, source_positions_y, source_positions_z_tf], axis=1)
# Update the estimate for H, and take just the last dimension
H_guess = generate_H(channel_positions, source_positions, source_orientations)[:,:,0]
# Calculate the difference between the two sides of the equation
difference = C_Y_tf - tf.matmul(tf.matmul(H_guess, C_X_tf), tf.transpose(H_guess)) - C_n_tf
# Calculate the cost
total_cost = tf.reduce_sum(tf.square(difference))
# Calculate gradients
gradients = tape.gradient(total_cost, poly_coefficients)
# Apply gradients
optimizer.apply_gradients(zip(gradients, poly_coefficients))
The problem seems to be cause by
H = np.zeros((num_channels, num_sources, 3), dtype=np.float32) # Initialize the leadfield matrix
This creates a numpy array which doesn't track gradients like tensorflow does, since numpy is a numeric computational library where tensorflow is a symbolic computational library. You convert it back to a tensor at the end of generateH
, however, the gradients were already lost. Therefore the gradient is lost from H_guess
onwards.
You'd need rewrite magnetic_dipole_fast_tf
without numpy but only tensorflow operations, such that the gradients are kept. Probably best to create a vectorized version while you're at it for efficiency. If you do so, you can also remove generateH
which only maps magnetic_dipole_fast_tf
across certain dimensions
This is my attempt, it creates gradients as apposed to your generateH
, but I can't guarantee the correctness, since I don't fully understand what you're doing. It should be in the ballpark though.
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()
# Simulated data
num_sources = 50
num_channels_x = 5
num_channels_y = 5
# Define the ground truth positions of the sources in 3D space (x, y, z coordinates)
source_positions_x = tf.cast(np.linspace(-1, 1, num_sources), dtype=tf.float32)
source_positions_y = tf.zeros_like(source_positions_x) # All sources at y=0
source_positions_z = tf.cast(-(source_positions_x ** 2) + source_positions_x -1.0, dtype=tf.float32)
# Specify the coefficients as constants
coefficients_values = [0.01, -1.2, 1.1, 0.0, 2.0]
# Create TensorFlow variables with these specific values
initial_poly_coefficients = [tf.Variable([value], dtype=tf.float32,trainable=True) for value in coefficients_values]
# Define the initial guess positions of the sources
initial_source_positions_x = tf.cast(np.linspace(-1, 1, num_sources), dtype=tf.float32)
initial_source_positions_y = tf.zeros_like(initial_source_positions_x) # Initial guess: all sources at z=0
initial_source_positions_z = tf.math.polyval(initial_poly_coefficients, initial_source_positions_x)
# Define the positions of the channels (on a 2D grid in the xy-plane above the sources)
channel_positions_x = np.linspace(-1, 1, num_channels_x)
channel_positions_y = np.linspace(-1, 1, num_channels_y)
channel_positions_z = tf.zeros((num_channels_x * num_channels_y,), dtype=tf.float32) # All channels at z=0
# Create 3D arrays for positions and orientations (in this example, orientations are along the z-axis)
source_positions = tf.stack([source_positions_x, source_positions_y, source_positions_z], axis=1)
source_orientations = tf.constant([[0.0, 0.0, 1.0]] * num_sources, dtype=tf.float32)
# Create the channel positions on the grid
channel_positions_x, channel_positions_y = np.meshgrid(channel_positions_x, channel_positions_y)
channel_positions_x = channel_positions_x.flatten()
channel_positions_y = channel_positions_y.flatten()
channel_positions = tf.stack([channel_positions_x, channel_positions_y, channel_positions_z], axis=1)
def magnetic_dipole_fast_tf(R, pos, ori):
"""
Calculate the leadfield for a magnetic dipole in an infinite medium using TensorFlow operations.
Parameters:
R (Tensor): Position of the magnetic dipole (1x3 Tensor).
pos (Tensor): Positions of magnetometers (Nx3 Tensor).
ori (Tensor): Orientations of magnetometers (Nx3 Tensor).
Returns:
lf (Tensor): Leadfield matrix (Nx3 Tensor).
# Example usage:
R = tf.constant([1.0, 2.0, 3.0], dtype=tf.float32)
pos = tf.constant([[4.0, 5.0, 6.0], [7.0, 8.0, 9.0]], dtype=tf.float32) # Example positions of magnetometers
ori = tf.constant([[0.0, 0.0, 1.0], [0.0, 1.0, 0.0]], dtype=tf.float32) # Example orientations of magnetometers
leadfield = magnetic_dipole_fast_tf(R, pos, ori)
"""
mu_0 = 4 * np.pi * 1e-7 # Permeability of free space
R = tf.reshape(R, (1, -1, 3))
pos = tf.reshape(pos, (-1, 1, 3))
ori = tf.reshape(ori, (1, -1, 3))
# Calculate the pair-wise distances from between the depoles and the magnetometers
r = pos - R
distances = tf.norm(r, axis=2) ** 2
# Calculate the magnetic field contribution for all magnetometers
dot_product = tf.reduce_sum(ori * r, axis=2)
return (mu_0 / (4 * np.pi)) * (
3 * dot_product[..., tf.newaxis] * r / distances[..., tf.newaxis]**(5/2)
-
ori / distances[..., tf.newaxis]**3
)
return lf
with tf.GradientTape() as tape:
# Generate H matrix based on fixed x-coordinates and make an estimate for the z coordinate. The
# y coordinate remains fixed, too.
source_positions_z_tf = tf.math.polyval(initial_poly_coefficients, source_positions_x)
# Now we need to stack the source positions into a tensor:
source_positions = tf.stack([source_positions_x, source_positions_y, source_positions_z_tf], axis=1)
# Update the estimate for H, and take just the last dimension
H_guess = magnetic_dipole_fast_tf(source_positions, channel_positions, source_orientations)
gradients = tape.gradient(H_guess, initial_poly_coefficients)
gradients # list of actual gradients rather than None's