Search code examples
arraysalgorithmdivide-and-conquer

How to translate a solution into divide-and-conquer (finding a sub array with the largest, smallest value)


I am trying to get better at divide an conquer algorithms and am using this one below as an example. Given an array _in and some length l it finds the start point of a sub array _in[_min_start,_min_start+l] such that the lowest value in that sub array is the highest it could possible be. I have come up with a none divide and conquer solution and am wondering how I could go about translating this into one which divides the array up into smaller parts (divide-and-conquer).

def main(_in, l):
    _min_start = 0
    min_trough = None

    for i in range(len(_in)+1-l):
        if min_trough is None:
            min_trough = min(_in[i:i+l])
        
        if min(_in[i:i+l]) > min_trough:
            _min_start = i
            min_trough = min(_in[i:i+l])
    
    return _min_start, _in[_min_start:_min_start+l]

e.g. For the array [5, 1, -1, 2, 5, -4, 3, 9, 8, -2, 0, 6] and a sub array of lenght 3 it would return start position 6 (resulting in the array [3,9,8]).


Solution

  • Three O(n) solutions and a benchmark

    Note I'm renaming _in and l to clearer-looking names A and k.

    Solution 1: Divide and conquer

    Split the array in half. Solve left half and right half recursively. The subarrays not yet considered cross the middle, i.e., they're a suffix of the left part plus a prefix of the right part. Compute k-1 suffix-minima of the left half and k-1 prefix-minima of the right half. That allows you to compute the minimum for each middle-crossing subarray of length k in O(1) time each. The best subarray for the whole array is the best of left-best, right-best and crossing-best.

    Runtime is O(n), I believe. As Ellis pointed out, in the recursion the subarray can become smaller than k. Such cases take O(1) time to return the equivalent of "there aren't any k-length subarrays in here". So the time is:

    T(n) = { 2 * T(n/2) + O(k)    if n >= k
           { O(1)                 otherwise
    

    For any 0 <= k <= n we have k=nc with 0 <= c <= 1. Then the number of calls is Θ(n1-c) and each call's own work takes Θ(nc) time, for a total of Θ(n) time.

    Posted a question about the complexity to be sure.

    Python implementation:

    def solve_divide_and_conquer(A, k):
        def solve(start, stop):
            if stop - start < k:
                return -inf,
            mid = (start + stop) // 2
            left = solve(start, mid)
            right = solve(mid, stop)
            i0 = mid - k + 1
            prefixes = accumulate(A[mid:mid+k-1], min)
            if i0 < 0:
                prefixes = [*prefixes][-i0:]
                i0 = 0
            suffixes = list(accumulate(A[i0:mid][::-1], min))[::-1]
            crossing = max(zip(map(min, suffixes, prefixes), count(i0)))
            return max(left, right, crossing)
        return solve(0, len(A))[1]
    

    Solution 2: k-Blocks

    As commented by @benrg, the above dividing-and-conquering is needlessly complicated. We can simply work on blocks of length k. Compute the suffix minima of the first block and the prefix minima of the second block. That allows finding the minimum of each k-length subarray within these two blocks in O(1) time. Do the same with the second and third block, the third and fourth block, etc. Time is O(n) as well.

    Python implementation:

    def solve_blocks(A, k):
        return max(max(zip(map(min, prefixes, suffixes), count(mid-k)))
                   for mid in range(k, len(A)+1, k)
                   for prefixes in [accumulate(A[mid:mid+k], min, initial=inf)]
                   for suffixes in [list(accumulate(A[mid-k:mid][::-1], min, initial=inf))[::-1]]
               )[1]
    

    Solution 3: Monoqueue

    Not divide & conquer, but first one I came up with (and knew was O(n)).

    Sliding window, represent the window with a deque of (sorted) indexes of strictly increasing array values in the window. When sliding the window to include a new value A[i]:

    1. Remove the first index from the deque if the sliding makes it fall out of the window.
    2. Remove indexes whose array values are larger than A[i]. (They can never be the minimum of the window anymore.)
    3. Include the new index i.
    4. The first index still in the deque is the index of the current window's minimum value. Use that to update overall result.

    Python implementation:

    from collections import deque
    
    A = [5, 1, -1, 2, 5, -4, 3, 9, 8, -2, 0, 6]
    k = 3
    
    I = deque()
    for i in range(len(A)):
        if I and I[0] == i - k:
            I.popleft()
        while I and A[I[-1]] >= A[i]:
            I.pop()
        I.append(i)
        curr_min = A[I[0]]
        if i == k-1 or i > k-1 and curr_min > max_min:
            result = i - k + 1
            max_min = curr_min
    
    print(result)
    

    Benchmark

    With 4000 numbers from the range 0 to 9999, and k=2000:

     80.4 ms   81.4 ms   81.8 ms  solve_brute_force
     80.2 ms   80.5 ms   80.7 ms  solve_original
      2.4 ms    2.4 ms    2.4 ms  solve_monoqueue
      2.4 ms    2.4 ms    2.4 ms  solve_divide_and_conquer
      1.3 ms    1.4 ms    1.4 ms  solve_blocks
    

    Benchmark code (Try it online!):

    from timeit import repeat
    from random import choices
    from itertools import accumulate
    from math import inf
    from itertools import count
    from collections import deque
    
    def solve_monoqueue(A, k):
        I = deque()
        for i in range(len(A)):
            if I and I[0] == i - k:
                I.popleft()
            while I and A[I[-1]] >= A[i]:
                I.pop()
            I.append(i)
            curr_min = A[I[0]]
            if i == k-1 or i > k-1 and curr_min > max_min:
                result = i - k + 1
                max_min = curr_min
        return result
    
    def solve_divide_and_conquer(A, k):
        def solve(start, stop):
            if stop - start < k:
                return -inf,
            mid = (start + stop) // 2
            left = solve(start, mid)
            right = solve(mid, stop)
            i0 = mid - k + 1
            prefixes = accumulate(A[mid:mid+k-1], min)
            if i0 < 0:
                prefixes = [*prefixes][-i0:]
                i0 = 0
            suffixes = list(accumulate(A[i0:mid][::-1], min))[::-1]
            crossing = max(zip(map(min, suffixes, prefixes), count(i0)))
            return max(left, right, crossing)
        return solve(0, len(A))[1]
    
    def solve_blocks(A, k):
        return max(max(zip(map(min, prefixes, suffixes), count(mid-k)))
                   for mid in range(k, len(A)+1, k)
                   for prefixes in [accumulate(A[mid:mid+k], min, initial=inf)]
                   for suffixes in [list(accumulate(A[mid-k:mid][::-1], min, initial=inf))[::-1]]
               )[1]
            
    def solve_brute_force(A, k):
        return max(range(len(A)+1-k),
                   key=lambda start: min(A[start : start+k]))
    
    def solve_original(_in, l):
        _min_start = 0
        min_trough = None
    
        for i in range(len(_in)+1-l):
            if min_trough is None:
                min_trough = min(_in[i:i+l])
            
            if min(_in[i:i+l]) > min_trough:
                _min_start = i
                min_trough = min(_in[i:i+l])
        
        return _min_start  # , _in[_min_start:_min_start+l]
    
    solutions = [
        solve_brute_force,
        solve_original,
        solve_monoqueue,
        solve_divide_and_conquer,
        solve_blocks,
    ]
    
    for _ in range(3):
        A = choices(range(10000), k=4000)
        k = 2000
    
        # Check correctness
        expect = None
        for solution in solutions:
            index = solution(A.copy(), k)
            assert 0 <= index and index + k-1 < len(A)
            min_there = min(A[index : index+k])
            if expect is None:
                expect = min_there
                print(expect)
            else:
                print(min_there == expect, solution.__name__)
        print()
    
        # Speed
        for solution in solutions:
            copy = A.copy()
            ts = sorted(repeat(lambda: solution(copy, k), number=1))[:3]
            print(*('%5.1f ms ' % (t * 1e3) for t in ts), solution.__name__)
        print()