Search code examples
pythonnumpymultidimensional-arraynumpy-ndarray

Optimal multiplication of two 3D arrays having a variable dimension


I would like to multiply tensors R = {R_1, R_2, ..., R_M} and X = {X_1, X_2, ..., X_M} where R_i and X_i are 3×3 and 3×N_i matrices, respectively. How can I make maximum use of NumPy functionalities during the formation of the R_i × X_i arrays?

My MWE is the following:

import numpy as np

np.random.seed(0)

M = 5
R = [np.random.rand(3, 3) for _ in range(M)]
X = []
for i in range(M):
    N_i = np.random.randint(1, 6)
    X_i = np.random.rand(3, N_i)
    X.append(X_i)
    
result = np.zeros((3, 0))
for i in range(M):
    R_i = R[i]
    X_i = X[i]
    result = np.hstack((result, np.dot(R_i, X_i)))

print(result)

Edit #1:

Thanks for everyone who helped me with his valuable comments. Meanwhile I was thinking about the role of N_is in my real problem and came to the conclusion that the number of unique N_is is in fact small (1 to 5; 2 is the most common one, but 1 is also very frequent). In this case, would there be a more efficient solution to the treatment of multiplications?

Another aspect which would be important: in practice, I store a 3 × N matrix X, not the individual X_i blocks. The columns of X are not ordered w.r.t. the R list. Instead, I store only an index vector p which provides the correct ordering for the X columns. In this case, an einsum version would be the following (in comparison with the "direct" multiplication):

import numpy as np

M = 30
N = 100

np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)

result_einsum = np.einsum('ijk,ki->ji', R[p], X)

result_direct = np.zeros((3, N))
for i in range(N):
    result_direct[:, i] = np.dot(R[p[i]], X[:, i])

print(np.allclose(result_einsum, result_direct))

Edit #2:

It seems that Numba helps quite a lot:

import numpy as np
import numba
from timeit import Timer

M = 30
N = 100

np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)

@numba.njit
def numba_direct(R, p, X, result_direct, N):
    for i in range(N):
        p_i = p[i]
        for j in range(3):
            res = 0.0
            for k in range(3):
                res += R[p_i, j, k] * X[k, i]
            result_direct[j, i] = res

result_direct = np.zeros((3, N))
numba_direct(R, p, X, result_direct, N)
result_einsum = np.einsum('ijk,ki->ji', R[p], X)
print(np.allclose(result_einsum, result_direct))

ntimes = 10000

einsum_timer = Timer(lambda: np.einsum('ijk,ki->ji', R[p], X))
einsum_time = einsum_timer.timeit(number=ntimes)

numba_direct_timer = Timer(lambda: numba_direct(R, p, X, result_direct, N))
numba_direct_time = numba_direct_timer.timeit(number=ntimes)

print(f'Einsum runtime: {einsum_time:.4f} seconds')
print(f'Numba direct runtime: {numba_direct_time:.4f} seconds')

The execution times are the following for the above code:

Einsum runtime: 0.0979 seconds
Numba direct runtime: 0.0129 seconds

Solution

  • I know I am neither @mozway nor @hpaulj (this is referring to @chrslg's comment), but indeed there seems to be a feasible solution with einsum:

    np.einsum("ijk,ki->ji",
              np.repeat(R, [x.shape[1] for x in X], axis=0),
              np.hstack(X))
    

    Here is the full code with which I tested:

    import numpy as np
    from timeit import Timer
    
    np.random.seed(0)
    
    L, M, timeit_n_times = 3, 5, 10_000
    R = [np.random.rand(L, L) for _ in range(M)]
    X = []
    for i in range(M):
        N_i = np.random.randint(1, 6)
        X_i = np.random.rand(L, N_i)
        X.append(X_i)
    
    def original(R, X):    
        result = np.zeros((L, 0))
        for i in range(M):
            R_i = R[i]
            X_i = X[i]
            result = np.hstack((result, np.dot(R_i, X_i)))
        return result
    
    def list_stack(R, X):
        result = []
        for i in range(M):
            R_i = R[i]
            X_i = X[i]
            result.append(np.dot(R_i, X_i))
        return np.hstack(result)
    
    def preallocate(R, X):
        result=np.empty((L, sum(x.shape[1] for x in X)))
        k=0
        for i in range(M):
            R_i = R[i]
            X_i = X[i]
            result[:, k:k+X_i.shape[1]] = np.dot(R_i, X_i)
            k += X_i.shape[1]
        return result
    
    def with_einsum(R, X):
        return np.einsum("ijk,ki->ji",
                         np.repeat(R, [x.shape[1] for x in X], axis=0),
                         np.hstack(X))
        
    assert np.allclose(original(R, X), list_stack(R, X))
    assert np.allclose(original(R, X), preallocate(R, X))
    assert np.allclose(original(R, X), with_einsum(R, X))
    
    print("original", Timer(lambda: original(R, X)).timeit(timeit_n_times))
    print("list_stack", Timer(lambda: list_stack(R, X)).timeit(timeit_n_times))
    print("preallocate", Timer(lambda: preallocate(R, X)).timeit(timeit_n_times))
    print("with_einsum", Timer(lambda: with_einsum(R, X)).timeit(timeit_n_times))
    

    Some observations:

    • The calculation times of list_stack() and preallocate() are marginally different, but are always faster than original(), which is also what is suggested and implied in @chrslg's answer.
    • Whether or not with_einsum() is faster or slower than the others depends pretty much on the shape of the problem:
      • For small, but many L×L matrices (L==3 in the question), with_einsum() wins.
        Here are the results for L, M, timeit_n_times = 3, 500, 10_000:
        original 19.07941191700229
        list_stack 6.437358160997974
        preallocate 7.638774587001535
        with_einsum 3.6944152869982645
        
      • For large, but few L×L matrices, with_einsum() loses.
        Here are the results for L, M, timeit_n_times = 100, 5, 100_000:
        original 6.661783802999707
        list_stack 4.67112236200046
        preallocate 4.94899292899936
        with_einsum 28.233751856001618
        

    I did not check the influence of the size and variation of N_i.

    Update

    Based on the update to the question and @chrslg's thoughts, I tried whether zero-padding would also be a viable way to go. Bottom line is: probably not (at least not for the given example).

    Here is the new testing code:

    from timeit import Timer
    import numpy as np
    
    M, N, timeit_n_times = 30, 100, 10_000
    
    np.random.seed(0)
    p = np.random.randint(M, size=N)
    R = np.random.rand(M, 3, 3)
    X = np.random.rand(3, N)
    
    einsum = lambda: np.einsum('ijk,ki->ji', R[p], X)
    
    def direct():
        res = np.zeros((3, N))
        for i in range(N):
            res[:, i] = np.dot(R[p[i]], X[:, i])
        return res
    
    # Rearrange data for padding
    max_count = np.max(np.unique(p, return_counts=True)[1])
    counts = np.zeros(M, dtype=int)
    
    X_padded = np.zeros((M, max_count, 3))
    for i, idx in enumerate(p):
        X_padded[idx, counts[idx]] = X[:, i]
        counts[idx] += 1
    
    padded = lambda: np.einsum('ijk,ilk->ilj', R, X_padded)
        
    result_einsum = einsum()
    result_direct = direct()
    result_padded_raw = padded()
    
    # Extract padding result (reverse steps of getting from X to X_padded)
    result_padded = np.zeros((3, N))
    counts = np.zeros(M, dtype=int)
    for i, idx in enumerate(p):
        result_padded[:, i] = result_padded_raw[idx, counts[idx]]
        counts[idx] += 1
    
    assert np.allclose(result_einsum, result_direct)
    assert np.allclose(result_padded, result_direct)
    
    print("einsum", Timer(einsum).timeit(timeit_n_times))
    print("direct", Timer(direct).timeit(timeit_n_times))
    print("padded", Timer(padded).timeit(timeit_n_times))
    

    Here, we first rearrange the data into X_padded, which is M×max_count×3-shaped, where max_count is the maximum number of references to one of the indices in R from p. We iteratively fill up X_padded for each index, leaving unused space zero-filled. In the end, we can again use a version of einsum to calculate the result (padded = lambda: np.einsum('ijk,ilk->ilj', R, X_padded)). If we want to compare the result to the results of the other methods, then we need to rearrange it back again, basically inverting the steps of getting from X to X_padded.

    Observations:

    • Memory-wise, we have:

      • Saved on the side of R, as we don't need to replicate the individual 3×3 matrices, any more.
      • Paid on the side of X, as we need zero-padding.
    • Speed-wise, we have lost a bit, it seems;
      here are the results for M, N, timeit_n_times = 30, 100, 100_000:

      einsum 0.7587459350002064
      direct 13.305958173999898
      padded 1.502786388000004
      

      Note that this does not even include the times for rearranging the data. So probably padding won't help here – at least not in the way that I tried.