Search code examples
pythonarraysmatlabnumpysimilarity

Selecting close matches from one array based on another reference array


I have an array A and a reference array B. Size of A is at least as big as B. e.g.

A = [2,100,300,793,1300,1500,1810,2400]
B = [4,305,789,1234,1890]

B is in fact the position of peaks in a signal at a specified time, and A contains position of peaks at a later time. But some of the elements in A are actually not the peaks I want (might be due to noise, etc), and I want to find the 'real' one in A based on B. The 'real' elements in A should be close to those in B, and in the example given above, the 'real' ones in A should be A'=[2,300,793,1300,1810]. It should be obvious in this example that 100,1500,2400 are not the ones we want as they are quite far off from any of the elements in B. How can I code this in the most efficient/accurate way in python/matlab?


Solution

  • Approach #1: With NumPy broadcasting, we can look for absolute element-wise subtractions between the input arrays and use an appropriate threshold to filter out unwanted elements from A. It seems for the given sample inputs, a threshold of 90 works.

    Thus, we would have an implementation, like so -

    thresh = 90
    Aout = A[(np.abs(A[:,None] - B) < thresh).any(1)]
    

    Sample run -

    In [69]: A
    Out[69]: array([   2,  100,  300,  793, 1300, 1500, 1810, 2400])
    
    In [70]: B
    Out[70]: array([   4,  305,  789, 1234, 1890])
    
    In [71]: A[(np.abs(A[:,None] - B) < 90).any(1)]
    Out[71]: array([   2,  300,  793, 1300, 1810])
    

    Approach #2: Based on this post, here's a memory efficient approach using np.searchsorted, which could be crucial for large arrays -

    def searchsorted_filter(a, b, thresh):
        choices = np.sort(b) # if b is already sorted, skip it
        lidx = np.searchsorted(choices, a, 'left').clip(max=choices.size-1)
        ridx = (np.searchsorted(choices, a, 'right')-1).clip(min=0)
        cl = np.take(choices,lidx) # Or choices[lidx]
        cr = np.take(choices,ridx) # Or choices[ridx]
        return a[np.minimum(np.abs(a - cl), np.abs(a - cr)) < thresh]
    

    Sample run -

    In [95]: searchsorted_filter(A,B, thresh = 90)
    Out[95]: array([   2,  300,  793, 1300, 1810])
    

    Runtime test

    In [104]: A = np.sort(np.random.randint(0,100000,(1000)))
    
    In [105]: B = np.sort(np.random.randint(0,100000,(400)))
    
    In [106]: out1 = A[(np.abs(A[:,None] - B) < 10).any(1)]
    
    In [107]: out2 = searchsorted_filter(A,B, thresh = 10)
    
    In [108]: np.allclose(out1, out2)  # Verify results
    Out[108]: True
    
    In [109]: %timeit A[(np.abs(A[:,None] - B) < 10).any(1)]
    100 loops, best of 3: 2.74 ms per loop
    
    In [110]: %timeit searchsorted_filter(A,B, thresh = 10)
    10000 loops, best of 3: 85.3 µs per loop
    

    Jan 2018 Update with further performance boost

    We can avoid the second usage of np.searchsorted(..., 'right') by making use of the indices obtained from np.searchsorted(..., 'left') and also the absolute computations, like so -

    def searchsorted_filter_v2(a, b, thresh):
        N = len(b)
    
        choices = np.sort(b) # if b is already sorted, skip it
    
        l = np.searchsorted(choices, a, 'left')
        l_invalid_mask = l==N
        l[l_invalid_mask] = N-1
        left_offset = choices[l]-a
        left_offset[l_invalid_mask] *= -1    
    
        r = (l - (left_offset!=0))
        r_invalid_mask = r<0
        r[r_invalid_mask] = 0
        r += l_invalid_mask
        right_offset = a-choices[r]
        right_offset[r_invalid_mask] *= -1
    
        out = a[(left_offset < thresh) | (right_offset < thresh)]
        return out
    

    Updated timings to test the further speedup -

    In [388]: np.random.seed(0)
         ...: A = np.random.randint(0,1000000,(100000))
         ...: B = np.unique(np.random.randint(0,1000000,(40000)))
         ...: np.random.shuffle(B)
         ...: thresh = 10
         ...: 
         ...: out1 = searchsorted_filter(A, B, thresh)
         ...: out2 = searchsorted_filter_v2(A, B, thresh)
         ...: print np.allclose(out1, out2)
    True
    
    In [389]: %timeit searchsorted_filter(A, B, thresh)
    10 loops, best of 3: 24.2 ms per loop
    
    In [390]: %timeit searchsorted_filter_v2(A, B, thresh)
    100 loops, best of 3: 13.9 ms per loop
    

    Digging deeper -

    In [396]: a = A; b = B
    
    In [397]: N = len(b)
         ...: 
         ...: choices = np.sort(b) # if b is already sorted, skip it
         ...: 
         ...: l = np.searchsorted(choices, a, 'left')
    
    In [398]: %timeit np.sort(B)
    100 loops, best of 3: 2 ms per loop
    
    In [399]: %timeit np.searchsorted(choices, a, 'left')
    100 loops, best of 3: 10.3 ms per loop
    

    Seems like searchsorted and sort are taking almost all of the runtime and they seem essential to this method. So, doesn't seem like it could be improved any further staying with this sort-based approach.