Search code examples
pythonnumpyvectorizationtrigonometry

Efficiently calculate angle between three points over triplets of rows in a numpy array


Suppose we have a numpy array A of size M x N which we interpret as M vectors of dimension N. For three vectors a,b,c we'd like to compute the cosine of the angle they form:

cos(angle(a,b,c)) = np.dot((a-b)/norm(a-b), (c-b)/norm(c-b))

We want to compute this quantity over triplets of A, of which there should be (M choose 2)*(M-2) unique triplets (by symmetry of a and c; please correct me if I'm wrong on this). We can of course accomplish this with a triply nested for loop, but I am hoping this can be done in a vectorized manner. I am pretty sure I can use some broadcasting tricks to compute an array that includes the desired outputs and more, but I am hoping someone can furnish a recipe that computes exactly the unique quantities, preferably without extra computation. Thanks.

edit. for completeness, naive impl. with loops:

angles = []
for i in range(len(A)):
    for j in range(len(A)):
        for k in range(i+1, len(A)):
            if j not in (i,k):
                d1 = A[i] - A[j]
                d2 = A[k] - A[j]
                ang = np.dot(d1/np.linalg.norm(d1), d2/np.linalg.norm(d2))
                angles.append(ang)

Solution

  • of which there should be (M choose 2)*(M-2) unique triplets (by symmetry of a and c; please correct me if I'm wrong on this)

    I think that's right. I counted M * ((M-1) choose 2), and that's equivalent.

    I am hoping someone can furnish a recipe that computes exactly the unique quantities, preferably without extra computation.

    Well, let's start with the easy thing - vectorizing your loop, assuming we have arrays of indices i, j, and k pre-generated.

    def cosang1(A, i, j, k):
        d1 = A[i] - A[j]
        d2 = A[k] - A[j]
        d1_hat = np.linalg.norm(d1, axis=1, keepdims=True)
        d2_hat = np.linalg.norm(d2, axis=1, keepdims=True)
        # someone will almost certainly suggest a better way to do this
        ang = np.einsum("ij, ij -> i", d1/d1_hat, d2/d2_hat)
        return ang
    

    This reduces the problem to computing the arrays of indices, assuming that indexing the arrays is a small enough fraction of the total computation time. I can't see a way to avoid the redundant calculations without doing this sort of thing.

    Then, if we're willing to allow redundant calculation, the simplest way to generate the indices is with np.meshgrid.

    def cosang2(A):
        i = np.arange(len(A))
        i, j, k = np.meshgrid(i, i, i)
        i, j, k = i.ravel(), j.ravel(), k.ravel()
        return cosang1(A, i, j, k)
    

    For A of shape (30, 3) on Colab, the Python loop method took 160ms, and this solution took 7ms.

    It's very easy to generate the unique sets of indices quickly if we're allowed to use Numba. This is basically just your code broken out into a function:

    from numba import jit
    # generate unique tuples of indices of vectors
    @jit(nopython=True)
    def get_ijks(M):
        ijks = []
        for i in range(M):
            for j in range(M):
                for k in range(i+1, M):
                    if j not in (i, k):
                        ijks.append((i, j, k))
        return ijks
    

    (Of course, we could also just use Numba on your whole loop.)

    This took a little less than half the time as the redundant vectorized solution.

    It might be possible to generate the indices efficiently using pure NumPy. Initially, I thought it was going to be as simple as:

    i = np.arange(M)
    j, k = np.triu_indices(M, 1)
    i, j, k = np.broadcast_arrays(i, j[:, None], k[:, None])
    i, j, k = i.ravel(), j.ravel(), k.ravel()
    

    That's not quite right, but it is possible to start with that and fix those indices with a loop over range(M) (better than triple nesting, anyway!). Something like:

    # generates the same set of indices as `get_ijks`,
    # but currently a bit slower.
    def get_ijks2(M):
        i = np.arange(M)
        j, k = np.triu_indices(M-1, 1)
        i, j, k = np.broadcast_arrays(i[:, None], j, k)
        i, j, k = i.ravel(), j.ravel(), k.ravel()
        for ii in range(M):
            # this can be improved by using slices
            # instead of masks where possible
            mask0 = i == ii
            mask1 = (j >= ii) & mask0
            mask2 = (k >= ii) & mask0
            j[mask1] += 1
            k[mask1 | mask2] += 1
        return j, i, k  # intentionally swapped due to the way I think about this
    

    ~~I think this can be sped up by using just slices and no masks, but I can't do that tonight.~~

    Update: as indicated in the comment, the last loop wasn't necessary!

    def get_ijks3(M):
        i = np.arange(M)
        j, k = np.triu_indices(M-1, 1)
        i, j, k = np.broadcast_arrays(i[:, None], j, k)
        i, j, k = i.ravel(), j.ravel(), k.ravel()
        mask1 = (j >= i)
        mask2 = (k >= i)
        j[mask1] += 1
        k[mask1 | mask2] += 1
        return j, i, k  # intentionally swapped
    

    And that's quite a bit faster than the Numba loop. I'm actually surprised that worked out!


    All the code together, in case you want to run it:

    from numba import jit
    import numpy as np
    rng = np.random.default_rng(23942342)
    
    M = 30
    N = 3
    A = rng.random((M, N))
    
    # generate unique tuples of indices of vectors
    @jit(nopython=True)
    def get_ijks(M):
        ijks = []
        for i in range(M):
            for j in range(M):
                for k in range(i+1, M):
                    if j not in (i, k):
                        ijks.append((i, j, k))
        return ijks
    
    # attempt to generate the same integers efficiently
    # without Numba
    def get_ijks2(M):
        i = np.arange(M)
        j, k = np.triu_indices(M-1, 1)
        i, j, k = np.broadcast_arrays(i[:, None], j, k)
        i, j, k = i.ravel(), j.ravel(), k.ravel()
        for ii in range(M):
            # this probably doesn't need masks
            mask0 = i == ii
            mask1 = (j >= ii) & mask0
            mask2 = (k >= ii) & mask0
            j[mask1] += 1
            k[mask1 | mask2] += 1
        return j, i, k  # intentionally swapped due to the way I think about this
    
    # proposed method 
    def cosang1(A, i, j, k):
        d1 = A[i] - A[j]
        d2 = A[k] - A[j]
        d1_hat = np.linalg.norm(d1, axis=1, keepdims=True)
        d2_hat = np.linalg.norm(d2, axis=1, keepdims=True)
        ang = np.einsum("ij, ij -> i", d1/d1_hat, d2/d2_hat)
        return ang
    
    # another naive implementation
    def cosang2(A):
        i = np.arange(len(A))
        i, j, k = np.meshgrid(i, i, i)
        i, j, k = i.ravel(), j.ravel(), k.ravel()
        return cosang1(A, i, j, k)
    
    # naive implementation provided by OP
    def cosang0(A):
        angles = []
        for i in range(len(A)):
            for j in range(len(A)):
                for k in range(i+1, len(A)):
                    if j not in (i,k):
                        d1 = A[i] - A[j]
                        d2 = A[k] - A[j]
                        ang = np.dot(d1/np.linalg.norm(d1), d2/np.linalg.norm(d2))
                        angles.append(ang)
        return angles
    
    %timeit cosang0(A)
    
    %timeit get_ijks(len(A))
    ijks = np.asarray(get_ijks(M)).T
    %timeit cosang1(A, *ijks)
    
    %timeit cosang2(A)
    
    # 180 ms ± 34.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
    # 840 µs ± 68.3 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
    # 2.19 ms ± 126 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
    # <ipython-input-1-d2a3835710f2>:26: RuntimeWarning: invalid value encountered in divide
    #   ang = np.einsum("ij, ij -> i", d1/d1_hat, d2/d2_hat)
    # 8.13 ms ± 1.78 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
    
    cosangs0 = cosang0(A)
    cosangs1 = cosang1(A, *ijks)
    cosangs2 = cosang2(A)
    np.testing.assert_allclose(cosangs1, cosangs0)  # passes
    
    %timeit get_ijks2(M)
    # 1.73 ms ± 242 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
    i, j, k = get_ijks2(M)
    cosangs3 = cosang1(A, i, j, k)
    np.testing.assert_allclose(np.sort(cosangs3), np.sort(cosangs0))  # passes
    
    %timeit get_ijks3(M)
    # 184 µs ± 25.8 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
    i, j, k = get_ijks3(M)
    cosangs4 = cosang1(A, i, j, k)
    np.testing.assert_allclose(np.sort(cosangs4), np.sort(cosangs0))  # passes