I would like to multiply tensors R = {R_1, R_2, ..., R_M}
and X = {X_1, X_2, ..., X_M}
where R_i
and X_i
are 3×3
and 3×N_i
matrices, respectively. How can I make maximum use of NumPy functionalities during the formation of the R_i × X_i
arrays?
My MWE is the following:
import numpy as np
np.random.seed(0)
M = 5
R = [np.random.rand(3, 3) for _ in range(M)]
X = []
for i in range(M):
N_i = np.random.randint(1, 6)
X_i = np.random.rand(3, N_i)
X.append(X_i)
result = np.zeros((3, 0))
for i in range(M):
R_i = R[i]
X_i = X[i]
result = np.hstack((result, np.dot(R_i, X_i)))
print(result)
Edit #1:
Thanks for everyone who helped me with his valuable comments.
Meanwhile I was thinking about the role of N_i
s in my real problem and came to the conclusion that the number of unique N_i
s is in fact small (1 to 5; 2 is the most common one, but 1 is also very frequent). In this case, would there be a more efficient solution to the treatment of multiplications?
Another aspect which would be important: in practice, I store a 3 × N
matrix X
, not the individual X_i
blocks. The columns of X
are not ordered w.r.t. the R
list. Instead, I store only an index vector p
which provides the correct ordering for the X
columns.
In this case, an einsum
version would be the following (in comparison with the "direct" multiplication):
import numpy as np
M = 30
N = 100
np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)
result_einsum = np.einsum('ijk,ki->ji', R[p], X)
result_direct = np.zeros((3, N))
for i in range(N):
result_direct[:, i] = np.dot(R[p[i]], X[:, i])
print(np.allclose(result_einsum, result_direct))
Edit #2:
It seems that Numba helps quite a lot:
import numpy as np
import numba
from timeit import Timer
M = 30
N = 100
np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)
@numba.njit
def numba_direct(R, p, X, result_direct, N):
for i in range(N):
p_i = p[i]
for j in range(3):
res = 0.0
for k in range(3):
res += R[p_i, j, k] * X[k, i]
result_direct[j, i] = res
result_direct = np.zeros((3, N))
numba_direct(R, p, X, result_direct, N)
result_einsum = np.einsum('ijk,ki->ji', R[p], X)
print(np.allclose(result_einsum, result_direct))
ntimes = 10000
einsum_timer = Timer(lambda: np.einsum('ijk,ki->ji', R[p], X))
einsum_time = einsum_timer.timeit(number=ntimes)
numba_direct_timer = Timer(lambda: numba_direct(R, p, X, result_direct, N))
numba_direct_time = numba_direct_timer.timeit(number=ntimes)
print(f'Einsum runtime: {einsum_time:.4f} seconds')
print(f'Numba direct runtime: {numba_direct_time:.4f} seconds')
The execution times are the following for the above code:
Einsum runtime: 0.0979 seconds
Numba direct runtime: 0.0129 seconds
I know I am neither @mozway nor @hpaulj (this is referring to @chrslg's comment), but indeed there seems to be a feasible solution with einsum
:
np.einsum("ijk,ki->ji",
np.repeat(R, [x.shape[1] for x in X], axis=0),
np.hstack(X))
Here is the full code with which I tested:
import numpy as np
from timeit import Timer
np.random.seed(0)
L, M, timeit_n_times = 3, 5, 10_000
R = [np.random.rand(L, L) for _ in range(M)]
X = []
for i in range(M):
N_i = np.random.randint(1, 6)
X_i = np.random.rand(L, N_i)
X.append(X_i)
def original(R, X):
result = np.zeros((L, 0))
for i in range(M):
R_i = R[i]
X_i = X[i]
result = np.hstack((result, np.dot(R_i, X_i)))
return result
def list_stack(R, X):
result = []
for i in range(M):
R_i = R[i]
X_i = X[i]
result.append(np.dot(R_i, X_i))
return np.hstack(result)
def preallocate(R, X):
result=np.empty((L, sum(x.shape[1] for x in X)))
k=0
for i in range(M):
R_i = R[i]
X_i = X[i]
result[:, k:k+X_i.shape[1]] = np.dot(R_i, X_i)
k += X_i.shape[1]
return result
def with_einsum(R, X):
return np.einsum("ijk,ki->ji",
np.repeat(R, [x.shape[1] for x in X], axis=0),
np.hstack(X))
assert np.allclose(original(R, X), list_stack(R, X))
assert np.allclose(original(R, X), preallocate(R, X))
assert np.allclose(original(R, X), with_einsum(R, X))
print("original", Timer(lambda: original(R, X)).timeit(timeit_n_times))
print("list_stack", Timer(lambda: list_stack(R, X)).timeit(timeit_n_times))
print("preallocate", Timer(lambda: preallocate(R, X)).timeit(timeit_n_times))
print("with_einsum", Timer(lambda: with_einsum(R, X)).timeit(timeit_n_times))
Some observations:
list_stack()
and preallocate()
are marginally different, but are always faster than original()
, which is also what is suggested and implied in @chrslg's answer.with_einsum()
is faster or slower than the others depends pretty much on the shape of the problem:
L×L
matrices (L==3
in the question), with_einsum()
wins.L, M, timeit_n_times = 3, 500, 10_000
:
original 19.07941191700229
list_stack 6.437358160997974
preallocate 7.638774587001535
with_einsum 3.6944152869982645
L×L
matrices, with_einsum()
loses.L, M, timeit_n_times = 100, 5, 100_000
:
original 6.661783802999707
list_stack 4.67112236200046
preallocate 4.94899292899936
with_einsum 28.233751856001618
I did not check the influence of the size and variation of N_i
.
Based on the update to the question and @chrslg's thoughts, I tried whether zero-padding would also be a viable way to go. Bottom line is: probably not (at least not for the given example).
Here is the new testing code:
from timeit import Timer
import numpy as np
M, N, timeit_n_times = 30, 100, 10_000
np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)
einsum = lambda: np.einsum('ijk,ki->ji', R[p], X)
def direct():
res = np.zeros((3, N))
for i in range(N):
res[:, i] = np.dot(R[p[i]], X[:, i])
return res
# Rearrange data for padding
max_count = np.max(np.unique(p, return_counts=True)[1])
counts = np.zeros(M, dtype=int)
X_padded = np.zeros((M, max_count, 3))
for i, idx in enumerate(p):
X_padded[idx, counts[idx]] = X[:, i]
counts[idx] += 1
padded = lambda: np.einsum('ijk,ilk->ilj', R, X_padded)
result_einsum = einsum()
result_direct = direct()
result_padded_raw = padded()
# Extract padding result (reverse steps of getting from X to X_padded)
result_padded = np.zeros((3, N))
counts = np.zeros(M, dtype=int)
for i, idx in enumerate(p):
result_padded[:, i] = result_padded_raw[idx, counts[idx]]
counts[idx] += 1
assert np.allclose(result_einsum, result_direct)
assert np.allclose(result_padded, result_direct)
print("einsum", Timer(einsum).timeit(timeit_n_times))
print("direct", Timer(direct).timeit(timeit_n_times))
print("padded", Timer(padded).timeit(timeit_n_times))
Here, we first rearrange the data into X_padded
, which is M×max_count×3
-shaped, where max_count
is the maximum number of references to one of the indices in R
from p
. We iteratively fill up X_padded
for each index, leaving unused space zero-filled. In the end, we can again use a version of einsum
to calculate the result (padded = lambda: np.einsum('ijk,ilk->ilj', R, X_padded)
). If we want to compare the result to the results of the other methods, then we need to rearrange it back again, basically inverting the steps of getting from X
to X_padded
.
Observations:
Memory-wise, we have:
R
, as we don't need to replicate the individual 3×3
matrices, any more.X
, as we need zero-padding.Speed-wise, we have lost a bit, it seems;
here are the results for M, N, timeit_n_times = 30, 100, 100_000
:
einsum 0.7587459350002064
direct 13.305958173999898
padded 1.502786388000004
Note that this does not even include the times for rearranging the data. So probably padding won't help here – at least not in the way that I tried.