I've recently been developing a Python script that implements a specific mathematical function, which is shown in the figure below
where indexation is periodic and 1 <= j <= n
. The function is relatively complex and is inspired by a previous question. The main purpose of the code is to evaluate the mathematical function for large arrays x
(of size 5000 or more). Here is the current implementation of this function in Python:
import numpy as np
import sys, os
def compute_y(x, L, v):
n = len(x)
y = np.zeros(n)
for k in range(L+1):
# Generate the indices for the window around each j, with periodic wrapping
indices = np.arange(-k, k+1)
# Compute the weights
weights_1 = k - np.abs(indices)
weights_2 = k + 1 - np.abs(indices)
weights_d = np.ones(2*k+1)
# For each j, take the elements in the window around j and multiply by the weights
x_matrix = np.take(x, (np.arange(n)[:, None] + indices) % n, mode='wrap')
exp_1 = np.exp(-np.sum(weights_1[None, :] * x_matrix, axis=1)/v)
exp_2 = np.exp(-np.sum(weights_2[None, :] * x_matrix, axis=1)/v)
denom = np.sum(weights_d[None, :] * x_matrix, axis=1)
# Compute the weighted sum for each j and add to the total
y += (exp_1 - exp_2)/denom
return y
# Test the function
x = np.random.rand(5000)
L = len(x)//2
v = 1.4
y = compute_y(x, L, v)
The issue I'm currently facing is that this code, although functional and vectorized, is significantly slower than desired, particularly when applied to large arrays. I believe the primary source of this slowness is the for loop which deals with generating the indices, computing the weights, and summing and calculating the exponentials.
I am therefore looking for guidance and suggestions on how to speed up this code, specifically for arrays of size 5000 or larger. I am particularly interested in methods that take advantage of vectorization through Numpy to expedite the computations.
Any help would be much appreciated. Thank you in advance!
The most important thing to notice, I think, is that this is more-or-less a convolution:
exp_1 = np.exp(-np.sum(weights_1[None, :] * x_matrix, axis=1)/v)
If you look at the values of x_matrix, they are the x values for x in position -1, 0, 1, ... around each x value. Then it's being multiplied by weights. That's a convolution.
Why do we care? This is helpful because somebody has already made fast libraries for doing convolutions.
The core idea here is to replace np.take()
with three convolutions, and avoid creating the x_matrix
array.
import scipy.signal
def compute_y_convolution(x, L, v):
n = len(x)
y = np.zeros(n)
x3 = np.tile(x, 3)
for k in range(L+1):
# Generate the indices for the window around each j, with periodic wrapping
indices = np.arange(-k, k+1)
# Compute the weights
weights_1 = k - np.abs(indices)
weights_2 = k + 1 - np.abs(indices)
weights_d = np.ones(2*k + 1)
conv = scipy.signal.convolve(x3, weights_d, mode='same')[n:-n]
exp_1 = np.exp(-(scipy.signal.convolve(x3, weights_1, mode='same')[n:-n])/v)
exp_2 = np.exp(-(scipy.signal.convolve(x3, weights_2, mode='same')[n:-n])/v)
# Compute the weighted sum for each j and add to the total
y += (exp_1 - exp_2)/conv
return y
(Why does this create 3 copies of the x array before doing the convolution? At the edge of each array, you want it to wrap around and access elements on the other end of the array, but scipy.signal.convolve
will just treat those parts of the convolution as zero.)
This works pretty well, and it achieves a 141x speedup on a 5000 element array. (718 seconds vs. 5.06 seconds)
Convolutions are pretty expensive, and we end up doing three of them every loop in the previous example. Can we do better?
Let's print out the weights used by the convolution each loop:
k=0
weights_1=array([0])
weights_2=array([1])
weights_d=array([1.])
k=1
weights_1=array([0, 1, 0])
weights_2=array([1, 2, 1])
weights_d=array([1., 1., 1.])
We can notice three things:
weights_2
is equivalent to the value of weights_1
plus the uniform filter.weights_1
is equivalent to the value of weights_2
in the previous loop.Using those observations, we can go from 3 to 1 convolutions.
def compute_y_reuse(x, L, v):
n = len(x)
y = np.zeros(n)
last_exp_2_raw = np.zeros(n)
for k in range(L+1):
uniform = scipy.ndimage.uniform_filter(x, size=2*k + 1, mode='wrap') * (2*k + 1)
exp_1_raw = last_exp_2_raw
exp_1 = np.exp(-exp_1_raw/v)
exp_2_raw = exp_1_raw + uniform
exp_2 = np.exp(-exp_2_raw/v)
# Compute the weighted sum for each j and add to the total
y += (exp_1 - exp_2)/uniform
last_exp_2_raw = exp_2_raw
return y
This achieves a 1550x speedup versus the original on a 5000 element array. (718 seconds vs. 0.462 seconds)
I looked into this further, and tried to remove the last convolution. Essentially, the idea is that in the previous loop, we calculated the sum of the N closest elements, and the next loop, we calculate the sum of the N+2 closest elements, so we can just add up the 2 elements on the very edge.
I tried to use np.roll()
for this, but found it was slower than uniform_filter()
, because it must copy the array. Eventually I found this thread, which let me figure out how to solve this.
Also, since exp_1_raw
is the same as the previous iteration's exp_2_raw
, we can re-use the np.exp()
call by saving the output from that iteration.
def fast_roll_add(dst, src, shift):
dst[shift:] += src[:-shift]
dst[:shift] += src[-shift:]
def compute_y_noconv(x, L, v):
n = len(x)
y = np.zeros(n)
last_exp_2_raw = np.zeros(n)
last_exp_2 = np.ones(n)
uniform = x.copy()
for k in range(L+1):
if k != 0:
fast_roll_add(uniform, x, k)
fast_roll_add(uniform, x, -k)
exp_1_raw = last_exp_2_raw
exp_1 = last_exp_2
exp_2_raw = exp_1_raw + uniform / v
exp_2 = np.exp(-exp_2_raw)
# Compute the weighted sum for each j and add to the total
y += (exp_1 - exp_2) / uniform
last_exp_2_raw = exp_2_raw
last_exp_2 = exp_2
return y
This achieves a 3100x speedup versus the original on a 5000 element array. (718 seconds vs. 0.225 seconds) It also no longer requires scipy as a dependency.