Search code examples
pythonnumpysliding-window

How do I do calculations with a sliding window while being memory-efficient?


I am working with very large (several GB) 2-dimensional square NumPy arrays. Given an input array a, for each element, I would like to find the direction of its largest adjacent neighbor. I am using the provided sliding window view to try to avoid creating unnecessary copies:

# a is an L x L array of type np.float32
swv = sliding_window_view(a, (3, 3)) # (L-2) x (L-2) x 3 x 3
directions = swv.reshape(L-2, L-2, 9)[:,:,1::2].argmax(axis = 2).astype(np.uint8)

However, calling reshape here creates a (L-2) x (L-2) x 9 copy instead of a view, which consumes an undesirably large chunk of memory. Is there a way to do this operation in a vectorized fashion, but with a smaller memory footprint?

EDIT: Many of the responses are geared towards NumPy, which uses CPU (since that's what I initially asked, to simplify the problem). Would the optimal strategy be different for using CuPy, which is NumPy for GPU? As far as I know, it makes using Numba much less straightforward.


Solution

  • Since using sliding_window_view is not efficient for your use case, I will provide an alternative using Numba.

    First, to simplify the implementation, define the following argmax alternative.

    from numba import njit
    
    
    @njit
    def argmax(*values):
        """argmax alternative that can take an arbitrary number of arguments.
    
        Usage: argmax(0, 1, 3, 2)  # 2
        """
        max_arg = 0
        max_value = values[0]
        for i in range(1, len(values)):
            value = values[i]
            if value > max_value:
                max_value = value
                max_arg = i
        return max_arg
    

    This is a standard argmax function, except it takes multiple scalar arguments instead of a single numpy array.

    Using this argmax alternative, your operation can be easily re-implemented.

    @njit(cache=True)
    def neighbor_argmax(a):
        height, width = a.shape[0] - 2, a.shape[1] - 2
        out = np.empty((height, width), dtype=np.uint8)
        for y in range(height):
            for x in range(width):
                # window: a[y:y + 3, x:x + 3]
                # center: a[y + 1, x + 1]
                out[y, x] = argmax(
                    a[y, x + 1],  # up
                    a[y + 1, x],  # left
                    a[y + 1, x + 2],  # right
                    a[y + 2, x + 1],  # down
                )
        return out
    

    This function requires only a few variables to operate, excluding the input and output buffers. So we don't need to worry about memory footprint.

    Alternatively, you can use stencil, a sliding window utility for Numba. With stencil, you only need to define the kernel. Numba will take care of the rest.

    from numba import njit, stencil
    
    @stencil
    def kernel(window):
        # window: window[-1:2, -1:2]
        # center: window[0, 0]
        return np.uint8(  # Don't forget to cast to np.uint8.
            argmax(
                window[-1, 0],  # up
                window[0, -1],  # left
                window[0, 1],  # right
                window[1, 0],  # down
            )
        )
    
    
    @njit(cache=True)
    def neighbor_argmax_stencil(a):
        return kernel(a)[1:-1, 1:-1]  # Slicing is not mandatory.
    

    It can also be inlined, if you like.

    @njit(cache=True)
    def neighbor_argmax_stencil_inlined(a):
        f = stencil(lambda w: np.uint8(argmax(w[-1, 0], w[0, -1], w[0, 1], w[1, 0])))
        return f(a)[1:-1, 1:-1]  # Slicing is not mandatory.
    

    However, stencil is very limited in functionality and cannot completely replace sliding_window_view. One difference is that there is no option to skip the edges. It is always padded with a constant value (0 by default). That is, if you put (L, L) matrix, you will get (L, L) output, not (L-2, L-2).

    This is why I am slicing the output in the code above to match your implementation. However, this may not be the desired behavior, as it breaks memory contiguity. You can copy after slicing, but be aware that it will increase the peak memory usage.

    In addition, it should be noted that these functions can also be easily adapted for multi-threading. For details, please refer to the benchmark code below.

    Here is the benchmark.

    import math
    import timeit
    
    import numpy as np
    from numba import njit, prange, stencil
    from numpy.lib.stride_tricks import sliding_window_view
    
    
    def baseline(a):
        L = a.shape[0]
        swv = sliding_window_view(a, (3, 3))  # (L-2) x (L-2) x 3 x 3
        directions = swv.reshape(L - 2, L - 2, 9)[:, :, 1::2].argmax(axis=2).astype(np.uint8)
        return directions
    
    
    @njit
    def argmax(*values):
        """argmax alternative that can accept an arbitrary number of arguments.
    
        Usage: argmax(0, 1, 3, 2)  # 2
        """
        max_arg = 0
        max_value = values[0]
        for i in range(1, len(values)):
            value = values[i]
            if value > max_value:
                max_value = value
                max_arg = i
        return max_arg
    
    
    @njit(cache=True)
    def neighbor_argmax(a):
        height, width = a.shape[0] - 2, a.shape[1] - 2
        out = np.empty((height, width), dtype=np.uint8)
        for y in range(height):
            for x in range(width):
                # window: a[y:y + 3, x:x + 3]
                # center: a[y + 1, x + 1]
                out[y, x] = argmax(
                    a[y, x + 1],  # up
                    a[y + 1, x],  # left
                    a[y + 1, x + 2],  # right
                    a[y + 2, x + 1],  # down
                )
        return out
    
    
    @njit(cache=True, parallel=True)  # Add parallel=True.
    def neighbor_argmax_mt(a):
        height, width = a.shape[0] - 2, a.shape[1] - 2
        out = np.empty((height, width), dtype=np.uint8)
        for y in prange(height):  # Change this to prange.
            for x in range(width):
                # window: a[y:y + 3, x:x + 3]
                # center: a[y + 1, x + 1]
                out[y, x] = argmax(
                    a[y, x + 1],  # up
                    a[y + 1, x],  # left
                    a[y + 1, x + 2],  # right
                    a[y + 2, x + 1],  # down
                )
        return out
    
    
    @stencil
    def kernel(window):
        # window: window[-1:2, -1:2]
        # center: window[0, 0]
        return np.uint8(  # Don't forget to cast to np.uint8.
            argmax(
                window[-1, 0],  # up
                window[0, -1],  # left
                window[0, 1],  # right
                window[1, 0],  # down
            )
        )
    
    
    @njit(cache=True)
    def neighbor_argmax_stencil(a):
        return kernel(a)[1:-1, 1:-1]  # Slicing is not mandatory.
    
    
    @njit(cache=True)
    def neighbor_argmax_stencil_with_copy(a):
        return kernel(a)[1:-1, 1:-1].copy()  # Slicing is not mandatory.
    
    
    @njit(cache=True, parallel=True)
    def neighbor_argmax_stencil_mt(a):
        return kernel(a)[1:-1, 1:-1]  # Slicing is not mandatory.
    
    
    @njit(cache=True)
    def neighbor_argmax_stencil_inlined(a):
        f = stencil(lambda w: np.uint8(argmax(w[-1, 0], w[0, -1], w[0, 1], w[1, 0])))
        return f(a)[1:-1, 1:-1]  # Slicing is not mandatory.
    
    
    def benchmark():
        size = 2000  # Total nbytes (in MB) for a.
        n = math.ceil(math.sqrt(size * (10 ** 6) / 4))
        rng = np.random.default_rng(0)
        a = rng.random(size=(n, n), dtype=np.float32)
        print(f"{a.shape=}, {a.nbytes=:,}")
    
        expected = baseline(a)
        # expected = neighbor_argmax_mt(a)
        assert expected.shape == (n - 2, n - 2) and expected.dtype == np.uint8
    
        candidates = [
            baseline,
            neighbor_argmax,
            neighbor_argmax_mt,
            neighbor_argmax_stencil,
            neighbor_argmax_stencil_mt,
            neighbor_argmax_stencil_with_copy,
            neighbor_argmax_stencil_inlined,
        ]
        name_len = max(len(f.__name__) for f in candidates)
        for f in candidates:
            assert np.array_equal(expected, f(a)), f.__name__
            t = timeit.repeat(lambda: f(a), repeat=3, number=1)
            print(f"{f.__name__:{name_len}} : {min(t)}")
    
    
    if __name__ == "__main__":
        benchmark()
    

    Result:

    a.shape=(22361, 22361), a.nbytes=2,000,057,284
    baseline                          : 24.971996600041166
    neighbor_argmax                   : 0.1917789001017809
    neighbor_argmax_mt                : 0.11929619999136776
    neighbor_argmax_stencil           : 0.2940085999434814
    neighbor_argmax_stencil_mt        : 0.17756330000702292
    neighbor_argmax_stencil_with_copy : 0.46573049994185567
    neighbor_argmax_stencil_inlined   : 0.29338629997801036
    

    I think these results are enough to make you consider giving Numba a try :)


    The following section was added after this answer was accepted.

    Here is the CUDA version. (I'm using numba 0.60.0)

    from numba import cuda
    
    @cuda.jit(device=True)
    def argmax_cuda(values):  # cuda.jit cannot handle an arbitrary number of arguments.
        max_arg = 0
        max_value = values[0]
        for i in range(1, len(values)):
            value = values[i]
            if value > max_value:
                max_value = value
                max_arg = i
        return max_arg
    
    
    @cuda.jit
    def neighbor_argmax_cuda_impl(a, out):
        y, x = cuda.grid(2)
        if y < out.shape[0] and x < out.shape[1]:
            out[y, x] = argmax_cuda(
                # Make sure to use a tuple, not a list.
                (
                    a[y, x + 1],  # up
                    a[y + 1, x],  # left
                    a[y + 1, x + 2],  # right
                    a[y + 2, x + 1],  # down
                )
            )
    
    
    def neighbor_argmax_cuda(a, out):
        # If the input/output array is not on the GPU, you can transfer it like this.
        # However, note that this operation alone takes longer than neighbor_argmax_mt.
        # a = cuda.to_device(a)
        # out = cuda.to_device(out)
    
        # Block settings. I'm not sure if this is the optimal one.
        threadsperblock = (16, 16)
        blockspergrid_x = int(np.ceil(out.shape[1] / threadsperblock[1]))
        blockspergrid_y = int(np.ceil(out.shape[0] / threadsperblock[0]))
        blockspergrid = (blockspergrid_x, blockspergrid_y)
    
        neighbor_argmax_cuda_impl[blockspergrid, threadsperblock](a, out)
    
        # Back to CPU, if necessary.
        # out = out.copy_to_host()
        return out
    

    As Jérôme explained in detail, the time taken to transfer the input/output arrays from the host to the device cannot be ignored.

    a.shape=(22361, 22361), a.nbytes=2,000,057,284
    neighbor_argmax                         : 0.47917880106251687
    neighbor_argmax_mt                      : 0.08353979291860014
    neighbor_argmax_cuda (with transfer)    : 0.5072600540006533
    neighbor_argmax_cuda (without transfer) : 9.134004358202219e-05
    

    (I had to use another machine to use CUDA. For that reason, the results for the CPU are different from the ones I put above.)