I need to compress a numpy array by getting the more significant points , I mean where there is a change , but it has to be a specific number of points , for example my array may be 100 values i will need the 40 most meaningful ones. I tried the rdp algorithme but you cant specify the number of points , you cant expect the lenght of the results so for now i doing this but it is just random points not the most important
n=40
indices = np.linspace(0, len(array) - 1, num=n)
Here is an implementation of the idea I outlined in my comments. What we do is this: We use the regular Ramer–Douglas–Peucker algorithm. But instead of recursively splitting the point lists until an epsilon is achieved, we always split the worst sub-list until we reach the target size in remaining points.
The basic idea seems to work. The base algorithm is lifted from Wikipedia for validation and comparison.
#!/usr/bin/env python3
import heapq
import math
import numpy as np
def perpendicular_distance(point, start, end):
"""Distance from point to the line through start and end"""
dist = end - start
absdist = np.linalg.norm(dist)
if absdist == 0.:
return np.linalg.norm(point - start)
return np.linalg.norm(np.cross(dist, start - point)) / absdist
def douglas_peucker(pointlist, epsilon):
if len(pointlist) < 3:
return list(pointlist)
# Find the point with the maximum distance
start, end = pointlist[0], pointlist[-1]
dmax, index = max((perpendicular_distance(point, start, end), index)
for index, point in enumerate(pointlist))
if dmax < epsilon:
return [start, end]
rtrn = douglas_peucker(pointlist[:index + 1], epsilon)
# careful: The point at index is in both lists
rtrn[-1:] = douglas_peucker(pointlist[index:], epsilon)
return rtrn
def perpendicular_distance_vectorized(pointlist, start, end):
offsets = start - pointlist
dist = end - start
absdist = np.linalg.norm(dist)
scale = np.divide(1., absdist)
if not math.isfinite(scale):
return np.linalg.norm(offsets, axis=1)
cross = np.cross(dist, offsets)
cross = np.linalg.norm(cross, axis=1) if cross.ndim > 1 else abs(cross)
return cross * scale
class DprList:
def __init__(self, pointlist, startindex):
self.pointlist = pointlist
self.startindex = startindex
if len(pointlist) < 3:
self.index = 0
self.dmax = 0.
return
start, end = pointlist[0], pointlist[-1]
distances = perpendicular_distance_vectorized(
pointlist[1:-1], start, end)
index = np.argmax(distances)
self.dmax = distances[index]
self.index = index + 1
@property
def start(self):
return self.pointlist[0]
@property
def end(self):
return self.pointlist[-1]
def __len__(self):
return len(self.pointlist)
def __lt__(self, other):
"""Higher dmax results in lower sort order
Required for python's heapq module
"""
return self.dmax > other.dmax
def split(self):
pointlist = self.pointlist
index = self.index
startindex = self.startindex
front = DprList(pointlist[:index + 1], startindex)
back = DprList(pointlist[index:], startindex + index)
return front, back
def douglas_peucker_heap(pointlist, targetcount):
if len(pointlist) <= targetcount:
return list(pointlist)
heap = [DprList(pointlist, 0)]
finished = [] # sublists with length 2
for _ in range(targetcount - 2):
worst = heapq.heappop(heap)
for sublist in worst.split():
if len(sublist) < 3:
finished.append(sublist)
else:
heapq.heappush(heap, sublist)
heap += finished
heap.sort(key=lambda sublist: sublist.startindex)
rtrn = [heap[0].start]
rtrn.extend(sublist.end for sublist in heap)
return rtrn
def test():
points = np.array(
((0., 0.),
(1., 2.),
(3., -2.),
(4., -2.),
(5., -3.),
(6., 5.),
(7., 7.),
(8., 0.)))
print(np.asarray(douglas_peucker(points, 5.)))
print(np.asarray(douglas_peucker_heap(points, 4)))
def bench():
import sys
points = np.empty((1000, 2))
points[:, 0] = np.arange(len(points))
points[:, 1] = np.random.default_rng([1]).random(len(points))
if sys.argv[1] == "base":
for _ in range(10):
np.asarray(douglas_peucker(points, 0.87))
print(len(np.asarray(douglas_peucker(points, 0.87))))
else:
for _ in range(1000):
np.asarray(douglas_peucker_heap(points, 54))
print(len(np.asarray(douglas_peucker_heap(points, 54))))
if __name__ == '__main__':
# test()
bench()
For completeness and to make the benchmark more even, here is a version of the standard algorithm that is vectorized and avoids Python's recursion limit. It effectively has the same speed as the heap based version. For very large point lists I expect it to be slightly faster since it has better locality of data when iterating over the sub-lists.
def douglas_peucker_optimized(pointlist, epsilon):
if len(pointlist) < 3 or not epsilon > 0.:
return list(pointlist)
rtrn = [pointlist[0]]
stack = [DprList(pointlist, 0)]
while stack:
sublist = stack.pop()
if sublist.dmax < epsilon:
rtrn.append(sublist.end)
continue
front, back = sublist.split()
stack += [back, front]
return rtrn