Search code examples
pythonnumpyvectorization

How to vectorize a nested "for" loop with multiple "if" statements using Numpy?


I have a simple 2D ray-casting routine that gets terribly slow as soon as the number of obstacles increases.

This routine is made up of:

  • 2 for loops (outer loop iterates over each ray/direction, then inner loop iterates over each line obstacle)

  • multiple if statements (check if a value is > or < than another value or if an array is empty)

Question: How can I condense all these operations into 1 single block of vectorized instructions using Numpy ?

More specifically, I am facing 2 issues:

  • I have managed to vectorize the inner loop (intersection between a ray and each obstacle) but I am unable to run this operation for all rays at once.

  • The only workaround I found to deal with the if statements is to use masked arrays. Something tells me it is not the proper way to handle these statements in this case (it seems clumsy, cumbersome and unpythonic)

Original code:

from math import radians, cos, sin
import matplotlib.pyplot as plt
import numpy as np

N = 10  # dimensions of canvas (NxN)
sides = np.array([[0, N, 0, 0], [0, N, N, N], [0, 0, 0, N], [N, N, 0, N]])
edges = np.random.rand(5, 4) * N # coordinates of 5 random segments (x1, x2, y1, y2)
edges = np.concatenate((edges, sides))
center = np.array([N/2, N/2]) # coordinates of center point
directions = np.array([(cos(radians(a)), sin(radians(a))) for a in range(0, 360, 10)]) # vectors pointing in all directions
intersections = []

# for each direction
for d in directions:
    
    min_dist = float('inf')
    
    # for each edge
    for e in edges:
        p1x, p1y = e[0], e[2]
        p2x, p2y = e[1], e[3]
        p3x, p3y = center
        p4x, p4y = center + d
        
        # find intersection point
        den = (p1x - p2x) * (p3y - p4y) - (p1y - p2y) * (p3x - p4x)
        
        if den:
            t = ((p1x - p3x) * (p3y - p4y) - (p1y - p3y) * (p3x - p4x)) / den
            u = -((p1x - p2x) * (p1y - p3y) - (p1y - p2y) * (p1x - p3x)) / den
            
            # if any:
            if t > 0 and t < 1 and u > 0:
                sx = p1x + t * (p2x - p1x)
                sy = p1y + t * (p2y - p1y)
                
                isec = np.array([sx, sy])           
                dist = np.linalg.norm(isec-center) 
                
                # make sure to select the nearest one (from center)
                if dist < min_dist:
                    min_dist = dist
                    nearest = isec
    
    # store nearest interesection point for each ray
    intersections.append(nearest)

    
# Render
plt.axis('off')

for x, y in zip(edges[:,:2], edges[:,2:]):
    plt.plot(x, y)
    
for isec in np.array(intersections):
    plt.plot((center[0], isec[0]), (center[1], isec[1]), '--', color="#aaaaaa", linewidth=.8)

enter image description here

Vectorized version (attempt):

from math import radians, cos, sin
import matplotlib.pyplot as plt
from scipy import spatial
import numpy as np

N = 10  # dimensions of canvas (NxN)
sides = np.array([[0, N, 0, 0], [0, N, N, N], [0, 0, 0, N], [N, N, 0, N]])
edges = np.random.rand(5, 4) * N # coordinates of 5 random segments (x1, x2, y1, y2)
edges = np.concatenate((edges, sides))
center = np.array([N/2, N/2]) # coordinates of center point
directions = np.array([(cos(radians(a)), sin(radians(a))) for a in range(0, 360, 10)]) # vectors pointing in all directions
intersections = []

# Render edges
plt.axis('off')
for x, y in zip(edges[:,:2], edges[:,2:]):
    plt.plot(x, y)
    
    
# for each direction
for d in directions:

    p1x, p1y = edges[:,0], edges[:,2]
    p2x, p2y = edges[:,1], edges[:,3]
    p3x, p3y = center
    p4x, p4y = center + d

    # denominator
    den = (p1x - p2x) * (p3y - p4y) - (p1y - p2y) * (p3x - p4x)

    # first 'if' statement -> if den > 0
    mask = den > 0
    den = den[mask]
    p1x = p1x[mask]
    p1y = p1y[mask]
    p2x = p2x[mask]
    p2y = p2y[mask]

    t = ((p1x - p3x) * (p3y - p4y) - (p1y - p3y) * (p3x - p4x)) / den
    u = -((p1x - p2x) * (p1y - p3y) - (p1y - p2y) * (p1x - p3x)) / den

    # second 'if' statement -> if (t>0) & (t<1) & (u>0)
    mask2 = (t > 0) & (t < 1) & (u > 0)
    t = t[mask2]
    p1x = p1x[mask2]
    p1y = p1y[mask2]
    p2x = p2x[mask2]
    p2y = p2y[mask2]
    
    # x, y coordinates of all intersection points in the current direction
    sx = p1x + t * (p2x - p1x)
    sy = p1y + t * (p2y - p1y)
    pts = np.c_[sx, sy]
    
    # if any:
    if pts.size > 0:
        
        # find nearest intersection point
        tree = spatial.KDTree(pts)
        nearest = pts[tree.query(center)[1]]

        # Render
        plt.plot((center[0], nearest[0]), (center[1], nearest[1]), '--', color="#aaaaaa", linewidth=.8)

Solution

  • Reformulation of the problem – Finding the intersection between a line segment and a line ray

    Let q and q2 be the endpoints of a segment (obstacle). For convenience let's define a class to represent points and vectors in the plane. In addition to the usual operations, a vector multiplication is defined by u × v = u.x * v.y - u.y * v.x.

    Caution: here Coord(2, 1) * 3 returns Coord(6, 3) while Coord(2, 1) * Coord(-1, 4) outputs 9. To avoid this confusion it might have been possible to restrict * to the scalar multiplication and use ^ via __xor__ for the vector multiplication.

    class Coord:
        def __init__(self, x, y):
            self.x = x
            self.y = y
    
        @property
        def radius(self):
            return np.sqrt(self.x ** 2 + self.y ** 2)
    
        def _cross_product(self, other):
            assert isinstance(other, Coord)
            return self.x * other.y - self.y * other.x
    
        def __mul__(self, other):
            if isinstance(other, Coord):
                # 2D "cross"-product
                return self._cross_product(other)
            elif isinstance(other, int) or isinstance(other, float):
                # scalar multiplication
                return Coord(self.x * other, self.y * other)
        
        def __rmul__(self, other):
            return self * other
    
        def __sub__(self, other):
            return Coord(self.x - other.x, self.y - other.y)
    
        def __add__(self, other):
            return Coord(self.x + other.x, self.y + other.y)
    
        def __repr__(self):
            return f"Coord({self.x}, {self.y})"
    

    Now, I find it easier to handle a ray in polar coordinates: For a given angle theta (direction) the goal is to determine if it intersects the segment, and if so determine the corresponding radius. Here is a function to find that. See here for an explanation of why and how. I tried to use the same variable names as in the previous link.

    def find_intersect_btw_ray_and_sgmt(q, q2, theta):
        """
        Args:
            q (Coord): first endpoint of the segment
            q2 (Coord): second endpoint of the segment
            theta (float): angle of the ray
    
        Returns:
            (float): np.inf if the ray does not intersect the segment,
                     the distance from the origin of the intersection otherwise
       """
        assert isinstance(q, Coord) and isinstance(q2, Coord)
        s = q2 - q
        r = Coord(np.cos(theta), np.sin(theta))
        cross = r * s  # 2d cross-product
        t_num = q * s
        u_num = q * r
        ## the intersection point is roughly at a distance t_num / cross
        ## from the origin. But some cases must be checked beforehand.
    
        ## (1) the segment [PQ2] is aligned with the ray
        if np.isclose(cross, 0) and np.isclose(u_num, 0):
            return min(q.radius, q2.radius)
        ## (2) the segment [PQ2] is parallel with the ray
        elif np.isclose(cross, 0):
            return np.inf
       
        t, u = t_num / cross, u_num / cross
        ## There is actually an intersection point
        if t >= 0 and 0 <= u <= 1:
            return t
       
        ## (3) No intersection point
        return np.inf
    

    For instance find_intersect_btw_ray_and_sgmt(Coord(1, 2), Coord(-1, 2), np.pi / 2) should returns 2.

    Note that here for simplicity, I only considered the case where the origin of the rays is at Coord(0, 0). This can be easily extended to the general case by setting t_num = (q - origin) * s and u_num = (q - origin) * r.

    Let's vectorize it!

    What is very interesting here is that the operations defined in the Coord class also apply to cases where x and y are numpy arrays! Hence applying any defined operation on Coord(np.array([1, 2, 0]), np.array([2, -1, 3])) amounts applying it elementwise to the points (1, 2), (2, -1) and (0, 3). The operations of Coord are therefore already vectorized. The constructor can be modified into:

        def __init__(self, x, y):
            x, y = np.array(x), np.array(y)
            assert x.shape == y.shape
            self.x, self.y = x, y
            self.shape = x.shape
    

    Now, we would like the function find_intersect_btw_ray_and_sgmt to be able to handle the case where the parameters q and q2contains sequences of endpoints. Before the sanity checks, all the operations are working properly since, as we have mentioned, they are already vectorized. As you mentionned the conditional statements can be "vectorized" using masks. Here is what I propose:

    def find_intersect_btw_ray_and_sgmts(q, q2, theta):
        assert isinstance(q, Coord) and isinstance(q2, Coord)
        assert q.shape == q2.shape
        EPS = 1e-14
        s = q2 - q
        r = Coord(np.cos(theta), np.sin(theta))
        cross = r * s
       
        cross_sign = np.sign(cross)
        cross = cross * cross_sign
        t_num = (q * s) * cross_sign
        u_num = (q * r) * cross_sign
    
        radii = np.zeros_like(t_num)
        mask = ~np.isclose(cross, 0) & (t_num >= -EPS) & (-EPS <= u_num) & (u_num <= cross + EPS)
    
        radii[~mask] = np.inf  # no intersection
        radii[mask] = t_num[mask] / cross[mask]  # intersection
        return radii
    

    Note that cross, t_num and u_num are multiplied by the sign of cross to ensure that the division by cross keeps the sign of the dividends. Hence conditions of the form ((t_num >= 0) & (cross >= 0)) | ((t_num <= 0) & (cross <= 0)) can be replaced by (t_num >= 0).

    For simplicity, we omitted the case (1) where the radius and the segment were aligned ((cross == 0) & (u_num == 0)). This could be incorporated by carefully adding a second mask.

    For a given value of theta, we are able to determine if the corresponing ray intersects with several segments at once.

    ## Some useful functions
    def polar_to_cartesian(r, theta):
        return Coord(r * np.cos(theta), r * np.sin(theta))
    
    def plot_segments(p, q, *args, **kwargs):
        plt.plot([p.x, q.x], [p.y, q.y], *args, **kwargs)
    
    def plot_rays(radii, thetas, *args, **kwargs):
        endpoints = polar_to_cartesian(radii, thetas)
        n = endpoints.shape
        origin = Coord(np.zeros(n), np.zeros(n))
        plot_segments(origin, endpoints, *args, **kwargs)
    
    ## Data generation
    M = 5  # size of the canvas
    N = 10  # number of segments
    K = 16  # number of rays
    
    q = Coord(*np.random.uniform(-M/2, M/2, size=(2, N)))
    p = q + Coord(*np.random.uniform(-M/2, M/2, size=(2, N)))
    thetas = np.linspace(0, 2 * np.pi, K, endpoint=False)
    
    ## For each ray, find the minimal distance of intersection
    ## with all segments
    plt.figure(figsize=(5, 5))
    plot_segments(p, q, "royalblue", marker=".")
    
    for theta in thetas:
        radii = find_intersect_btw_ray_and_sgmts(p, q, theta)
        radius = np.min(radii)
        if not np.isinf(radius):
            plot_rays(radius, theta, color="orange")
        else:
            plot_rays(2*M, theta, ':', c='orange')
    
    plt.plot(0, 0, 'kx')
    plt.xlim(-M, M)
    plt.ylim(-M, M)
    

    And that's not all! Thanks to the broadcasting of python, it is possible to avoid iteration on theta values. For example, recall that np.array([1, 2, 3]) * np.array([[1], [2], [3], [4]]) produces a matrix of size 4 × 3 of the pairwise products. In the same way Coord([[5],[7]], [[5],[1]]) * Coord([2, 4, 6], [-2, 4, 0]) outputs a 2 × 3 matrix containing all the pairwise cross product between vectors (5, 5), (7, 1) and (2, -2), (4, 4), (6, 0).

    Finally, the intersections can be determined in the following way:

    radii_all = find_intersect_btw_ray_and_sgmts(p, q, np.vstack(thetas))
    # p and q have a shape of (N,) and np.vstack(thetas) of (K, 1)
    # this radii_all have a shape of (K, N)
    # radii_all[k, n] contains the distance from the origin of the intersection
    # between k-th ray and n-th segment (or np.inf if there is no intersection point)
    radii = np.min(radii_all, axis=1)
    # radii[k] contains the distance from the origin of the closest intersection
    # between k-th ray and all segments
    
    
    do_intersect = ~np.isinf(radii)
    plot_rays(radii[do_intersect], thetas[do_intersect], color="orange")
    plot_rays(2*M, thetas[~do_intersect], ":", color="orange")
    

    results