Search code examples
pythonarrayscompression

get more significant points from array


I need to compress a numpy array by getting the more significant points , I mean where there is a change , but it has to be a specific number of points , for example my array may be 100 values i will need the 40 most meaningful ones. I tried the rdp algorithme but you cant specify the number of points , you cant expect the lenght of the results so for now i doing this but it is just random points not the most important

n=40   
indices = np.linspace(0, len(array) - 1, num=n)

Solution

  • Here is an implementation of the idea I outlined in my comments. What we do is this: We use the regular Ramer–Douglas–Peucker algorithm. But instead of recursively splitting the point lists until an epsilon is achieved, we always split the worst sub-list until we reach the target size in remaining points.

    The basic idea seems to work. The base algorithm is lifted from Wikipedia for validation and comparison.

    #!/usr/bin/env python3
    
    
    import heapq
    import math
    
    import numpy as np
    
    
    def perpendicular_distance(point, start, end):
        """Distance from point to the line through start and end"""
        dist = end - start
        absdist = np.linalg.norm(dist)
        if absdist == 0.:
            return np.linalg.norm(point - start)
        return np.linalg.norm(np.cross(dist, start - point)) / absdist
    
    
    def douglas_peucker(pointlist, epsilon):
        if len(pointlist) < 3:
            return list(pointlist)
        # Find the point with the maximum distance
        start, end = pointlist[0], pointlist[-1]
        dmax, index = max((perpendicular_distance(point, start, end), index)
                          for index, point in enumerate(pointlist))
        if dmax < epsilon:
            return [start, end]
        rtrn = douglas_peucker(pointlist[:index + 1], epsilon)
        # careful: The point at index is in both lists
        rtrn[-1:] = douglas_peucker(pointlist[index:], epsilon)
        return rtrn
    
    
    def perpendicular_distance_vectorized(pointlist, start, end):
        offsets = start - pointlist
        dist = end - start
        absdist = np.linalg.norm(dist)
        scale = np.divide(1., absdist)
        if not math.isfinite(scale):
            return np.linalg.norm(offsets, axis=1)
        cross = np.cross(dist, offsets)
        cross = np.linalg.norm(cross, axis=1) if cross.ndim > 1 else abs(cross)
        return cross * scale
    
    
    class DprList:
        def __init__(self, pointlist, startindex):
            self.pointlist = pointlist
            self.startindex = startindex
            if len(pointlist) < 3:
                self.index = 0
                self.dmax = 0.
                return
            start, end = pointlist[0], pointlist[-1]
            distances = perpendicular_distance_vectorized(
                    pointlist[1:-1], start, end)
            index = np.argmax(distances)
            self.dmax = distances[index]
            self.index = index + 1
    
        @property
        def start(self):
            return self.pointlist[0]
    
        @property
        def end(self):
            return self.pointlist[-1]
    
        def __len__(self):
            return len(self.pointlist)
    
        def __lt__(self, other):
            """Higher dmax results in lower sort order
    
            Required for python's heapq module
            """
            return self.dmax > other.dmax
    
        def split(self):
            pointlist = self.pointlist
            index = self.index
            startindex = self.startindex
            front = DprList(pointlist[:index + 1], startindex)
            back = DprList(pointlist[index:], startindex + index)
            return front, back
    
    
    def douglas_peucker_heap(pointlist, targetcount):
        if len(pointlist) <= targetcount:
            return list(pointlist)
        heap = [DprList(pointlist, 0)]
        finished = [] # sublists with length 2
        for _ in range(targetcount - 2):
            worst = heapq.heappop(heap)
            for sublist in worst.split():
                if len(sublist) < 3:
                    finished.append(sublist)
                else:
                    heapq.heappush(heap, sublist)
        heap += finished
        heap.sort(key=lambda sublist: sublist.startindex)
        rtrn = [heap[0].start]
        rtrn.extend(sublist.end for sublist in heap)
        return rtrn
    
    
    def test():
        points = np.array(
            ((0., 0.),
             (1., 2.),
             (3., -2.),
             (4., -2.),
             (5., -3.),
             (6., 5.),
             (7., 7.),
             (8., 0.)))
        print(np.asarray(douglas_peucker(points, 5.)))
        print(np.asarray(douglas_peucker_heap(points, 4)))
    
    
    def bench():
        import sys
        points = np.empty((1000, 2))
        points[:, 0] = np.arange(len(points))
        points[:, 1] = np.random.default_rng([1]).random(len(points))
        if sys.argv[1] == "base":
            for _ in range(10):
                np.asarray(douglas_peucker(points, 0.87))
            print(len(np.asarray(douglas_peucker(points, 0.87))))
        else:
            for _ in range(1000):
                np.asarray(douglas_peucker_heap(points, 54))
            print(len(np.asarray(douglas_peucker_heap(points, 54))))
    
    
    if __name__ == '__main__':
        # test()
        bench()
    

    Bonus

    For completeness and to make the benchmark more even, here is a version of the standard algorithm that is vectorized and avoids Python's recursion limit. It effectively has the same speed as the heap based version. For very large point lists I expect it to be slightly faster since it has better locality of data when iterating over the sub-lists.

    def douglas_peucker_optimized(pointlist, epsilon):
        if len(pointlist) < 3 or not epsilon > 0.:
            return list(pointlist)
        rtrn = [pointlist[0]]
        stack = [DprList(pointlist, 0)]
        while stack:
            sublist = stack.pop()
            if sublist.dmax < epsilon:
                rtrn.append(sublist.end)
                continue
            front, back = sublist.split()
            stack += [back, front]
        return rtrn