Issue with Hierarchical Lucas-Kanade method on optical flow
I am implementing the hierarchical Lucas-Kanade method in Python based on this tutorial. However, when applying the method to a rotating sphere, I am encountering unexpected results.
The overall structure of the algorithm is explained below. Notice that the equations in
this tutorial are using the
convention where the top-left corner is (0, 0)
and the bottom-right corner is
(width-1, height-1)
, while the implementation provided here swaps the x and y axes. In
other words, the coordinate for the top-left corner is (0, 0)
and the bottom-right
corner is (height-1, width-1)
.
Incorporating equations 19, 20, 23, 29, and 28, the basic Lucas-Kanade method is implemented as follows:
def lucas_kanade(img1, img2):
img1 = np.copy(img1).astype(np.float32)
img2 = np.copy(img2).astype(np.float32)
# Change the window size based on the image's size due to downsampling.
window_size = min(max(3, min(img1.shape[:2]) / 6), 31)
window_size = int(2 * (window_size // 2) + 1)
print("window size: ", window_size)
# Compute image gradients
Ix = np.zeros(img1.shape, dtype=np.float32)
Iy = np.zeros(img1.shape, dtype=np.float32)
Ix[1:-1, 1:-1] = (img1[1:-1, 2:] - img1[1:-1, :-2]) / 2 # pixels on boundry are 0.
Iy[1:-1, 1:-1] = (img1[2:, 1:-1] - img1[:-2, 1:-1]) / 2
# Compute temporal gradient
It = np.zeros(img1.shape, dtype=np.float32)
It = img1 - img2
# Define a (window_size, window_size) kernel for the convolution
kernel = np.ones((window_size, window_size), dtype=np.float32)
# kernel = create_gaussian_kernel(window_size, sigma=1)
# Use convolution to calculate the sum of the window for each pixel
Ix2 = convolve2d(Ix**2, kernel, mode="same", boundary="fill", fillvalue=0)
Iy2 = convolve2d(Iy**2, kernel, mode="same", boundary="fill", fillvalue=0)
Ixy = convolve2d(Ix * Iy, kernel, mode="same", boundary="fill", fillvalue=0)
Ixt = convolve2d(Ix * It, kernel, mode="same", boundary="fill", fillvalue=0)
Iyt = convolve2d(Iy * It, kernel, mode="same", boundary="fill", fillvalue=0)
# Compute optical flow parameters
det = Ix2 * Iy2 - Ixy**2
# Avoid division by zero
u = np.where((det > 1e-6), (Iy2 * Ixt - Ixy * Iyt) / det, 0)
v = np.where((det > 1e-6), (Ix2 * Iyt - Ixy * Ixt) / det, 0)
optical_flow = np.stack((u, v), axis=2)
return optical_flow.astype(np.float32)
The Gaussian pyramid is generated as follow.
def gen_gaussian_pyramid(im, max_level):
# Return `max_level+1` arrays.
gauss_pyr = [im]
for i in range(max_level):
gauss_pyr.append(cv2.pyrDown(gauss_pyr[-1]))
return gauss_pyr
The processing is conducted from the roughest image to the finest image in the pyramid. Thus, we also need to upsample the flow.
def expand(img, dst_size, interpolation=None):
# Increase dimension.
height, width = dst_size[:2]
return cv2.GaussianBlur(
cv2.resize( # dim: (width, height)
img, (width, height), interpolation=interpolation or cv2.INTER_LINEAR
),
(5, 5),
0,
)
In the equation 12, the right image needs to be shifted based on the number of pixels in
the previous loop. I choose to use opencv.remap
function to warp the left image to be
aligned with the right image.
def remap(a, flow):
height, width = flow.shape[:2]
# Create a grid of coordinates using np.meshgrid
y, x = np.meshgrid(np.arange(height), np.arange(width), indexing="ij")
# Create flow_map by adding the flow vectors
flow_map = np.column_stack( # NOTE: minus sign on flow
(x.flatten() + -flow[:, :, 0].flatten(), y.flatten() + -flow[:, :, 1].flatten())
)
# Reshape flow_map to match the original image dimensions
flow_map = flow_map.reshape((height, width, 2))
# Ensure flow_map values are within the valid range
flow_map[:, :, 0] = np.clip(flow_map[:, :, 0], 0, width - 1)
flow_map[:, :, 1] = np.clip(flow_map[:, :, 1], 0, height - 1)
# Convert flow_map to float32
flow_map = flow_map.astype(np.float32)
# Use cv2.remap for remapping
warped = cv2.remap(a, flow_map, None, cv2.INTER_LINEAR)
return warped
After defining all the basics, we can put them all together. Here, g_L
and d_L
are
the variable in equation 7.
def hierarchical_lucas_kanade(im1, im2, max_level): # max_level = 4
gauss_pyr_1 = gen_gaussian_pyramid(im1, max_level) # from finest to roughest
gauss_pyr_2 = gen_gaussian_pyramid(im2, max_level) # from finest to roughest
g_L = [0 for _ in range(max_level + 1)] # Every slot will be (h, w, 2) array.
d_L = [0 for _ in range(max_level + 1)] # Every slot will be (h, w, 2) array.
assert len(g_L) == 5 # 4 + 1 (base)
# Initialzie g_L[0] as (h, w, 2) zeros array
g_L[max_level] = np.zeros(gauss_pyr_1[-1].shape[:2] + (2,)).astype(np.float32)
for level in range(max_level, -1, -1): # 4, 3, 2, 1, 0
# Warp image 1 by previous flow.
warped = remap(gauss_pyr_1[level], g_L[level])
# Run Lucas-Kanade on warped image and right image.
d_L[level] = lucas_kanade(warped, gauss_pyr_2[level])
# Expand/Upsample the flow so that the dimension can match the finer result.
g_L[level - 1] = 2.0 * expand(
g_L[level] + d_L[level],
gauss_pyr_2[level - 1].shape[:2] + (2,),
interpolation=cv2.INTER_LINEAR,
)
return g_L[0] + d_L[0]
After downloading the data, you can run it with the code:
sphere_seq = []
for fname in natsorted(Path("./input/sphere/").rglob("*.ppm")):
sphere_seq.append(cv2.imread(str(fname), cv2.IMREAD_GRAYSCALE))
flows = []
for i in range(len(sphere_seq) - 1):
flows.append(hierarchical_lucas_kanade(sphere_seq[i], sphere_seq[i + 1], max_level=4))
show_flow(sphere_seq[i], flows[i], f"./output/sphere/flow-{i}.png")
The result looks like below:
There are several problems in the result:
It looks like the flow's x direction is correct but the y direction is not. It could be my my visulization code is wrong. Here is the code:
def show_flow(img, flow, filename=None):
x = np.arange(0, img.shape[1], 1)
y = np.arange(0, img.shape[0], 1)
x, y = np.meshgrid(x, y)
plt.figure(figsize=(10, 10))
fig = plt.imshow(img, cmap="gray", interpolation="bicubic")
plt.axis("off")
fig.axes.get_xaxis().set_visible(False)
fig.axes.get_yaxis().set_visible(False)
num_points_per_axis = 32
step = int(img.shape[0] / num_points_per_axis)
plt.quiver(
x[::step, ::step],
y[::step, ::step], # reverse order?
flow[::step, ::step, 0],
flow[::step, ::step, 1], # reverse sign?
color="r",
pivot="tail",
headwidth=2,
headlength=3,
)
if filename is not None:
plt.savefig(filename, bbox_inches="tight", pad_inches=0)
Any insights or suggestions on resolving this issue would be greatly appreciated. Thank you!
Here's a program based off of the code you provide. Below is just one example of the kind of parameter optimization which could be experimented with to fine-tune the accuracy of the optical flow analysis results. The principal changes introduced were:
import cv2
import imageio
import itertools as itl
import matplotlib.pyplot as plt
import numpy as np
from natsort import natsorted
from pathlib import Path
INPUT_DATA = # [path to input directory containing .ppm image files]
# Variable analysis parameters:
MIN_WIN_SIZE = [6, 7, 8]
NUM_AXIS_PTS = [32, 50, 74]
ARROW_SCALE = [50]
FLOW_CUTOFF = [0.25]
combinations = [
*itl.product(MIN_WIN_SIZE, NUM_AXIS_PTS, ARROW_SCALE, FLOW_CUTOFF)
]
print(len(combinations))
for MIN_WIN_SIZE, NUM_AXIS_PTS, ARROW_SCALE, FLOW_CUTOFF in combinations:
def lucas_kanade(img1, img2):
"""Compute optical flow using Lucas-Kanade method.
Args:
img1 (numpy.ndarray): First input image.
img2 (numpy.ndarray): Second input image.
Returns:
numpy.ndarray: Computed optical flow.
"""
img1 = np.copy(img1).astype(np.float32)
img2 = np.copy(img2).astype(np.float32)
# Change the window size based on the image's size due to downsampling.
window_size = min(max(3, min(img1.shape[:2]) / 6), MIN_WIN_SIZE)
window_size = int(2 * (window_size // 2) + 1)
# Compute image gradients
Ix = np.zeros(img1.shape, dtype=np.float32)
Iy = np.zeros(img1.shape, dtype=np.float32)
Ix[1:-1, 1:-1] = (img1[1:-1, 2:] - img1[1:-1, :-2]) / 2
Iy[1:-1, 1:-1] = (img1[2:, 1:-1] - img1[:-2, 1:-1]) / 2
# Compute temporal gradient
It = img1 - img2
# Define a (window_size, window_size) kernel for the convolution
kernel = np.ones((window_size, window_size), dtype=np.float32)
# Use convolution to calculate the sum of the window for each pixel
Ix2 = cv2.filter2D(Ix ** 2, -1, kernel)
Iy2 = cv2.filter2D(Iy ** 2, -1, kernel)
Ixy = cv2.filter2D(Ix * Iy, -1, kernel)
Ixt = cv2.filter2D(Ix * It, -1, kernel)
Iyt = cv2.filter2D(Iy * It, -1, kernel)
# Compute optical flow parameters
det = Ix2 * Iy2 - Ixy ** 2
# Avoid division by zero and handle invalid values
u = np.where((det > 1e-6), (Iy2 * Ixt - Ixy * Iyt) / (det + 1e-6), 0)
v = np.where((det > 1e-6), (Ix2 * Iyt - Ixy * Ixt) / (det + 1e-6), 0)
optical_flow = np.stack((u, v), axis=2)
return optical_flow.astype(np.float32)
def apply_magnitude_threshold(flow, threshold):
"""Apply magnitude thresholding to filter out small flows.
Args:
flow (numpy.ndarray): Input flow array.
threshold (float): Magnitude threshold value.
Returns:
numpy.ndarray: Thresholded flow array.
"""
magnitude = np.linalg.norm(
flow, axis=-1
) # Compute magnitude of flow vectors
magnitude = magnitude.reshape(
magnitude.shape + (1,)
) # Reshape to match flow shape
thresholded_flow = np.where(magnitude < threshold, 0, flow)
return thresholded_flow
def show_flow(img, flow, filename=None):
"""Visualize the flow on the input image.
Args:
img (numpy.ndarray): Input image.
flow (numpy.ndarray): Flow array to be visualized.
filename (str, optional): Output filename to save the visualization.
"""
x = np.arange(0, img.shape[1], 1)
y = np.arange(0, img.shape[0], 1)
x, y = np.meshgrid(x, y)
plt.figure(figsize=(10, 10))
fig = plt.imshow(img, cmap="gray", interpolation="bicubic")
plt.axis("off")
fig.axes.get_xaxis().set_visible(False)
fig.axes.get_yaxis().set_visible(False)
num_points_per_axis = NUM_AXIS_PTS
step = int(img.shape[0] / num_points_per_axis)
print(step)
scale_factor = (
ARROW_SCALE # Adjust arrow scale factor for better visualization
)
plt.quiver(
x[::step, ::step],
y[::step, ::step],
flow[::step, ::step, 0],
-flow[::step, ::step, 1], # Reverse sign for correct direction
color="r",
pivot="tail",
headwidth=2,
headlength=3,
scale=scale_factor,
)
if filename is not None:
plt.savefig(filename, bbox_inches="tight", pad_inches=0)
# Read sphere data (PPM images)
sphere_seq = []
for fname in natsorted(Path(INPUT_DATA).rglob("*.ppm")):
sphere_seq.append(cv2.imread(str(fname), cv2.IMREAD_GRAYSCALE))
# Compute optical flow and visualize
flows = []
for i in range(len(sphere_seq) - 1):
# Compute optical flow using Lucas-Kanade method
flow = lucas_kanade(sphere_seq[i], sphere_seq[i + 1])
# Apply magnitude thresholding to filter out small flows
thresholded_flow = apply_magnitude_threshold(
flow, threshold=FLOW_CUTOFF
)
# Visualize the thresholded flow
show_flow(
sphere_seq[i],
thresholded_flow,
f"{INPUT_DATA}/sphere{i:02}_thresholded.png",
)
# List all image files in the output folder
image_files = sorted(Path(INPUT_DATA).glob("*.png"))
# Create GIF animation from the image files
with imageio.get_writer(
f"{INPUT_DATA}/sphere_animation_MINWIN={MIN_WIN_SIZE}_NUMAXSPTS={NUM_AXIS_PTS}.gif",
mode="I",
) as writer:
for image_file in image_files:
image = imageio.imread(str(image_file))
writer.append_data(image)
print("GIF animation created successfully!")
Using this approach, for example, produced results such as:
As an update to this answer, below is a modification of the above analysis program which implements the hierarchical functions mentioned in the question and in the reference paper (e.g., pyramidal feature tracking, etc.).
"""Perform optical flow analysis using hierarchical Lucas-Kanade method and
create GIF animation from rotating sphere image sequence. """
import cv2
import imageio
import itertools as itl
import matplotlib.pyplot as plt
import numpy as np
from natsort import natsorted
from pathlib import Path
INPUT_DATA = # [path to directory containing .ppm input data files]
MAX_LEVEL = [3, 5]
NUM_AXIS_PTS = [16, 32, 64]
ARROW_SCALE = [1]
MIN_WIN_SIZE = [31]
FLOW_CUTOFF = [1e-2, 2.5e-2]
combinations = [
*itl.product(
MAX_LEVEL, NUM_AXIS_PTS, ARROW_SCALE, MIN_WIN_SIZE, FLOW_CUTOFF
)
]
print(len(combinations))
print(combinations)
for (
MAX_LEVEL,
NUM_AXIS_PTS,
ARROW_SCALE,
MIN_WIN_SIZE,
FLOW_CUTOFF,
) in combinations:
def gen_gaussian_pyramid(im, max_level):
"""Generate Gaussian pyramid from the input image."""
gauss_pyr = [im]
for i in range(max_level):
gauss_pyr.append(cv2.pyrDown(gauss_pyr[-1]))
return gauss_pyr
def expand(img, dst_size, interpolation=None):
"""Upsample the flow to match the dimensions of another image."""
height, width = dst_size[:2]
return cv2.GaussianBlur(
cv2.resize(
img,
(width, height),
interpolation=interpolation or cv2.INTER_LINEAR,
),
(5, 5),
0,
)
def remap(a, flow):
"""Warp the image by the flow in previous level."""
height, width = flow.shape[:2]
y, x = np.meshgrid(np.arange(height), np.arange(width), indexing="ij")
flow_map = np.column_stack(
(
x.flatten() + -flow[:, :, 0].flatten(),
y.flatten() + -flow[:, :, 1].flatten(),
)
)
flow_map = flow_map.reshape((height, width, 2))
flow_map[:, :, 0] = np.clip(flow_map[:, :, 0], 0, width - 1)
flow_map[:, :, 1] = np.clip(flow_map[:, :, 1], 0, height - 1)
flow_map = flow_map.astype(np.float32)
warped = cv2.remap(a, flow_map, None, cv2.INTER_LINEAR)
return warped
def hierarchical_lucas_kanade(im1, im2, max_level):
"""Compute optical flow using hierarchical Lucas-Kanade method."""
gauss_pyr_1 = gen_gaussian_pyramid(im1, max_level)
gauss_pyr_2 = gen_gaussian_pyramid(im2, max_level)
g_L = [0 for _ in range(max_level + 1)]
d_L = [0 for _ in range(max_level + 1)]
g_L[max_level] = np.zeros(gauss_pyr_1[-1].shape[:2] + (2,)).astype(
np.float32
)
for level in range(max_level, -1, -1):
warped = remap(gauss_pyr_1[level], g_L[level])
d_L[level] = lucas_kanade(warped, gauss_pyr_2[level])
g_L[level - 1] = 2.0 * expand(
g_L[level] + d_L[level],
gauss_pyr_2[level - 1].shape[:2] + (2,),
interpolation=cv2.INTER_LINEAR,
)
return g_L[0] + d_L[0]
def lucas_kanade(img1, img2):
"""Compute optical flow using Lucas-Kanade method."""
img1 = np.copy(img1).astype(np.float32)
img2 = np.copy(img2).astype(np.float32)
window_size = MIN_WIN_SIZE
Ix = cv2.Sobel(img1, cv2.CV_64F, 1, 0, ksize=5)
Iy = cv2.Sobel(img1, cv2.CV_64F, 0, 1, ksize=5)
It = img1 - img2
Ix2 = cv2.GaussianBlur(Ix ** 2, (window_size, window_size), 0)
Iy2 = cv2.GaussianBlur(Iy ** 2, (window_size, window_size), 0)
Ixy = cv2.GaussianBlur(Ix * Iy, (window_size, window_size), 0)
Ixt = cv2.GaussianBlur(Ix * It, (window_size, window_size), 0)
Iyt = cv2.GaussianBlur(Iy * It, (window_size, window_size), 0)
det = Ix2 * Iy2 - Ixy ** 2
u = np.where((det > 1e-6), (Iy2 * Ixt - Ixy * Iyt) / (det + 1e-6), 0)
v = np.where((det > 1e-6), (Ix2 * Iyt - Ixy * Ixt) / (det + 1e-6), 0)
optical_flow = np.stack((u, v), axis=2)
return optical_flow.astype(np.float32)
def apply_magnitude_threshold(flow, threshold):
"""Apply magnitude thresholding to filter out small flows.
Args:
flow (numpy.ndarray): Input flow array.
threshold (float): Magnitude threshold value.
Returns:
numpy.ndarray: Thresholded flow array.
"""
magnitude = np.linalg.norm(
flow, axis=-1
) # Compute magnitude of flow vectors
magnitude = magnitude.reshape(
magnitude.shape + (1,)
) # Reshape to match flow shape
thresholded_flow = np.where(magnitude < threshold, 0, flow)
return thresholded_flow
def show_flow(img, flow, filename=None):
"""Visualize the flow on the input image."""
x = np.arange(0, img.shape[1], 1)
y = np.arange(0, img.shape[0], 1)
x, y = np.meshgrid(x, y)
plt.figure(figsize=(10, 10))
fig = plt.imshow(img, cmap="gray", interpolation="bicubic")
plt.axis("off")
fig.axes.get_xaxis().set_visible(False)
fig.axes.get_yaxis().set_visible(False)
num_points_per_axis = NUM_AXIS_PTS
step = int(img.shape[0] / num_points_per_axis)
scale_factor = ARROW_SCALE
plt.quiver(
x[::step, ::step],
y[::step, ::step],
flow[::step, ::step, 0],
-flow[::step, ::step, 1], # Reverse sign for correct direction
color="r",
pivot="tail",
headwidth=2,
headlength=3,
scale=scale_factor,
)
if filename is not None:
plt.savefig(filename, bbox_inches="tight", pad_inches=0)
# Read sphere data (PPM images)
sphere_seq = []
for fname in natsorted(Path(INPUT_DATA).rglob("*.ppm")):
sphere_seq.append(cv2.imread(str(fname), cv2.IMREAD_GRAYSCALE))
# Compute optical flow and visualize
flows = []
for i in range(len(sphere_seq) - 1):
flow = hierarchical_lucas_kanade(
sphere_seq[i], sphere_seq[i + 1], MAX_LEVEL
)
# Apply magnitude thresholding to filter out small flows
thresholded_flow = apply_magnitude_threshold(
flow, threshold=FLOW_CUTOFF
)
# Visualize the thresholded flow
show_flow(
sphere_seq[i], thresholded_flow, f"{INPUT_DATA}/sphere{i:02}.png",
)
# Create GIF animation from the image files
image_files = sorted(Path(INPUT_DATA).glob("*.png"))
with imageio.get_writer(
f"{INPUT_DATA}/sphere_animation_MAXLEVEL={MAX_LEVEL}" \
"_MINWINSIZE={MIN_WIN_SIZE}_NUMAXSPTS={NUM_AXIS_PTS}" \
"_ARROWSCALE={ARROW_SCALE}_FLOWCUTOFF={FLOW_CUTOFF}.gif",
mode="I",
) as writer:
for image_file in image_files:
image = imageio.imread(str(image_file))
writer.append_data(image)
There appears to be, in both this hierarchical and the previous simpler method shown above, a tradeoff in varying specifically the "minimum window size" parameter which is proving tricky to optimize: the smaller the window size, the more accurate the detection of flow but the less accurate the assessment of the direction of the flow. And vice versa - a larger window size (e.g., MIN_WIN_SIZE=31
[Note: the value for the parameter must be odd]) results in more accurate visualization of the direction of the flow in the images, but at the cost of introducing apparent noisiness in the flow detected (e.g., flow arrows appearing outside the bounds of the rotating sphere).
Below are some representative examples selected from the more numerous combinatorial set of outputs generated by this version of the analysis (implementing the hierarchical adaptation of the Lucas-Kanade method).
FLOWCUTOFF=0.01 (i.e., ≈None
)¹
FLOWCUTOFF=0.025 (Note: Depicted flow is [almost] all contained accurately within the sphere now, however with a concomitant loss in the sensitivity of detection (some peripheral areas are now missing flow quivers).)
FLOWCUTOFF=0.01 (i.e., ≈None
)
FLOWCUTOFF=0.01 (i.e., ≈None
)
FLOWCUTOFF=0.01 (i.e., ≈None
)
FLOWCUTOFF=0.01 (i.e., ≈None
)
¹ Note: Setting the parameter FLOW_CUTOFF=0.01
is essentially equivalent to having no threshold cutoff for the magnitudes of detected flow — all detected flow is passed through at this low-level value for the parameter.