Search code examples
pythonlinedetectionpointshapely

Finding all possible lines inside a set of points


I want to detect all possible "straight" line combinations (as long as criteria are met (condition inside extend_line() function: if x_sign == new_x_sign and y_sign == new_y_sign and np.sign(slope) == np.sign(new_slope) and abs(slope - new_slope) <= 2)) out of my sample set of points. Therefore I tried a approach with nesting my line extention function extend_line(), so the function runs as many times as it needs in order to combine all line segements meeting the criteria. To achive this I calculate the nearby points for every single point and afterwards check with line combinations work out.

The basic idea does work for the first "iteration" and the first "straigt forward" lineament meeting the criteria is found:

longest straight line starting point 0

But as soon as the latest execution of nested extend_line() is done, I am having the issue that I am not able to get rid of all line segments, which belong to the former line but do not belong to the current line (the brown and purple line segments should be removed:

multilinestring with wrongly kept line segments

I tried manipulating different varibales (like helper_list) but this did not work and as soon as the nested function was executed and the variables were "reset" to the variables from the original extend_line() function. I also tried truncating helper_list, where I store the line segment so it matches the amount of visited points. This seemed to be pretty promising, but the way/idea I had, did not work as desired.

import geopandas as gpd
from shapely.geometry import Point, LineString, MultiLineString
from shapely.ops import linemerge
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist

def calc_slope(line):
    x1,y1 = line.coords[0]
    x2,y2 = line.coords[1]
    try:
        s = (y2-y1)/(x2-x1)
    except:
        s = 0
    return s

def extend_line(start_line, current_line, current_point, x_sign, y_sign, slope, points, visited_points, nearby_points_dict, helper_list=[]):
    # Add current line to helper_list if first "iteration"
    if len(visited_points) == 1:
        helper_list.append(current_line)
    # Add the current point to visited points
    visited_points.append(tuple(current_point))
    
    # Find nearby points that have not been visited yet
    key = np.where(points == current_point)
    nearby_points = [p for p in nearby_points_dict[key[0][0]] if tuple(p) not in visited_points]
    
    # Iterate over nearby points to extend the line
    for next_point in nearby_points:
        # # Truncate list to match amount of visited points
        # if len(visited_points) < len(helper_list):
        #     helper_list = helper_list[:len(visited_points)]
        # Create a new line from current point to next point
        new_line = LineString([current_point, next_point])
        
        # Check for signs in order to extent only points "in front"
        new_dx = current_point[0] - next_point[0]
        new_x_sign = np.sign(new_dx)
        new_dy = current_point[1] - next_point[1]
        new_y_sign = np.sign(new_dy)
        
        # Calculate slope of new line segment
        new_slope = calc_slope(new_line)
        
        # Check if conditions are met (same sign -> point in front, same lope sign -> same tilted line segment, slope witin threshold -> keeps "straight" line)
        if x_sign == new_x_sign and y_sign == new_y_sign and np.sign(slope) == np.sign(new_slope) and abs(slope - new_slope) <= 2:
            # Append valid line to the list of valid lines
            helper_list.append(new_line)
            
            # Recursively extend the line from the next point
            extend_line(start_line, new_line, next_point, x_sign, y_sign, new_slope, points, visited_points.copy(), nearby_points_dict, helper_list)
            
            if len(helper_list) > 1:
                mls = MultiLineString(helper_list)
                ls = linemerge(mls)
                valid_lines.append(ls)
                fig, ax = plt.subplots(figsize=(8, 8))
                gdf.plot(ax=ax, marker='o', color='gray', markersize=50, label='Original Points')
                try:
                    ax.plot(*ls.xy)
                except:
                    for geom in mls.geoms:
                        ax.plot(*geom.xy)
                plt.show()

# Sample data for GeoDataFrame
data = {
    'id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
           11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
           21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
    'x': [0.0, 1.0, 3.0, 2.0, 5.0, 4.0, 7.0, 6.0, 9.0, 8.0,
          1.5, 2.5, 4.5, 3.5, 6.5, 5.5, 8.5, 7.5, 10.0, 9.0,
          3.0, 4.0, 6.0, 5.0, 8.0, 7.0, 10.0, 9.0, 11.0, 10.0],
    'y': [0.0, 2.0, 1.0, 3.0, 0.5, 4.0, 2.5, 5.0, 1.5, 6.0,
          3.5, 5.0, 2.0, 6.0, 4.5, 8.0, 3.0, 7.0, 5.5, 9.0,
          7.0, 8.0, 6.0, 9.0, 8.5, 10.0, 7.5, 11.0, 9.0, 12.0]
}

# Create GeoDataFrame
geometry = [Point(xy) for xy in zip(data['x'], data['y'])]
gdf = gpd.GeoDataFrame(data, geometry=geometry)
gdf["id"] = gdf["id"] - 1

points = gdf[['x', 'y']].values

# Calculate distance matrix
pairwise_distances = cdist(points, points)

# Find indices of nearby points based on the threshold distance
nearby_indices = np.where((pairwise_distances > 1) & (pairwise_distances < 3))

# Exclude self-distances (where distance is zero)
nearby_indices = np.array([i for i in zip(*nearby_indices) if i[0] != i[1]])

# Create a list to store nearby points for each point
nearby_points_dict = {}

# Loop over each point to collect nearby points
for i in range(len(points)):
    nearby_points = points[nearby_indices[nearby_indices[:,0]==i,1]]
    nearby_points_dict[i] = nearby_points
all_lines = []
for i, nearby_points in nearby_points_dict.items():
    start_point = points[i]
    
    # Plot sample points and area to be considered nearby
    fig, ax = plt.subplots(figsize=(8, 8))
    gdf.plot(ax=ax, marker='o', color='gray', markersize=50, label='Original Points')
    for k, v in gdf.iterrows():
        ax.annotate(f'{v["id"]}\n({v["x"]},{v["y"]})',(v["x"],v["y"]), ha='center')
    ax.plot(points[i,0],points[i,1], marker='o', color='blue', label='Current Point')
    circle = plt.Circle(points[i], 3, color="green", alpha=0.25)
    ax.add_patch(circle)
    
    # Iterate over all nearby_points of start_point
    for point in nearby_points:
        end_point = point
        
        # Check signs of points in order to only extend points "in front" of start_point
        dx = start_point[0] - end_point[0]
        x_sign = np.sign(dx)
        dy = start_point[1] - end_point[1]
        y_sign = np.sign(dy)
        
        line = LineString([start_point, end_point])
        ax.plot(point[0],point[1], marker='o', color='red', linestyle='', label='Nearby Points')
        
        # Calculate slope of line in order to only extend with line segments within slope threshold
        slope = calc_slope(line)
        
        visited_points = [tuple(start_point)]
        valid_lines = []
        extend_line(line, line, end_point, x_sign, y_sign, slope, points, visited_points, nearby_points_dict)
        
    all_lines.append(valid_lines)
    break

Edit: Also wondering why I need to access keyinside extend_line() function by using key[0][0] if every point in points is there only once? Why does key printed out return (array([ 1, 1, 12], dtype=int64), array([0, 1, 1], dtype=int64)) for the first exectution of extent_line() and not only eg (array([1,], dtype=int64)?


Solution

  • If I understood your constraints correctly, this seems to do the trick with a recursive function:

    import math
    from functools import lru_cache
    
    import matplotlib.pyplot as plt
    import numpy as np
    
    points = {
        1: (0.0, 0.0),
        2: (1.0, 2.0),
        3: (3.0, 1.0),
        4: (2.0, 3.0),
        5: (5.0, 0.5),
        6: (4.0, 4.0),
        7: (7.0, 2.5),
        8: (6.0, 5.0),
        9: (9.0, 1.5),
        10: (8.0, 6.0),
        11: (1.5, 3.5),
        12: (2.5, 5.0),
        13: (4.5, 2.0),
        14: (3.5, 6.0),
        15: (6.5, 4.5),
        16: (5.5, 8.0),
        17: (8.5, 3.0),
        18: (7.5, 7.0),
        19: (10.0, 5.5),
        20: (9.0, 9.0),
        21: (3.0, 7.0),
        22: (4.0, 8.0),
        23: (6.0, 6.0),
        24: (5.0, 9.0),
        25: (8.0, 8.5),
        26: (7.0, 10.0),
        27: (10.0, 7.5),
        28: (9.0, 11.0),
        29: (11.0, 9.0),
        30: (10.0, 12.0),
    }
    
    
    @lru_cache(maxsize=128)
    def calc_slope(x1, y1, x2, y2):
        try:
            return (y2 - y1) / (x2 - x1)
        except ZeroDivisionError:
            return 0
    
    
    def get_candidate_point_ids(line: list[int]):
        skip_point_ids = set(line)
        last_x, last_y = points[line[-1]]
        if len(line) > 1:  # Not the first segment?
            last_slope = calc_slope(*points[line[-2]], last_x, last_y)
        else:
            last_slope = None
        for point_id, (x, y) in points.items():
            if point_id in skip_point_ids:
                continue
            dx = last_x - x
            dy = last_y - y
            if dx < 0 or dy < 0:  # Only consider points in front
                continue
            if 1 < np.hypot(dx, dy) < 3:  # Only consider points within 1..3 units
                continue
            if last_slope is not None:
                # Verify things against the previous segment if needed
                slope = calc_slope(last_x, last_y, x, y)
                if np.sign(slope) != np.sign(last_slope):
                    continue
                if abs(slope - last_slope) > 2:
                    continue
            yield point_id
    
    
    def get_lines(line):
        for next_point_id in get_candidate_point_ids(line):
            new_line = [*line, next_point_id]
            yield new_line
            yield from get_lines(new_line)
    
    
    def main():
        for start_point_id in points:
            for line in get_lines([start_point_id]):
                x, y = zip(*(points[point_id] for point_id in line))
                plt.plot(x, y, marker="o", label=f"{line}")
    
        plt.show()
    
    
    main()
    

    The output here is a bit messy since it's overlaying every possible polyline from all start points.

    enter image description here