Search code examples
pythonpandasnumpypyspark

Dot product calculation for angle between vectors in PySpark


I am trying to calculate the angle between two vectors using PySpark in Databricks. This is a fairly trivial task in Python, but I can't seem to create an efficient method in Pyspark with the main blocking point being the calculation of the dot product.

As I have been unable to calculate the dot product at all in Pyspark, I have chosen to implement my method in a UDF using numpy functions, but the outcome is not as fast as I would have liked. I'd appreciate any input on how to achieve this using more of PySpark itself, rather than relying on numpy.

import pandas as pd
import numpy as np

@udf("float")
def calculateAngle(x1, y1, x2, y2, x3, y3):
    a = np.array([x1,y1])
    b = np.array([x2,y2])
    c = np.array([x3,y3])

    ba = a - b
    bc = c - b

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    returnValue = np.degrees(np.arccos(cosine_angle))
    return returnValue.item()

data = {'x1':1.23, 'y1':3.23, 'x2':1.25, 'y2':3.2, 'x3':1.3, 'y3':2.8,}
df = pd.DataFrame(data, index=[0]) 
df = spark.createDataFrame(df)
df=df.withColumn("angle", calculateAngle('x1', 'y1', 'x2', 'y2', 'x3', 'y3'))
df=df.toPandas()
df

Solution

  • Python UDFs are slow: Spark functions vs UDF performance?. The angle computation between 2-dim vectors can be expressed in Spark:

    from pyspark.sql import Column
    from pyspark.sql import functions as F
    
    data = {'a1': 0, 'a2': 1, 'b1': 0, 'b2': 0, 'c1': 1, 'c2': 1}
    _vals = [tuple(v for k, v in data.items())]
    _schema = [k for k, v in data.items()]
    df = spark.createDataFrame(_vals, _schema)
    
    ba1 = F.col('a1') - F.col('b1')
    ba2 = F.col('a2') - F.col('b2')
    bc1 = F.col('c1') - F.col('b1')
    bc2 = F.col('c2') - F.col('b2')
    dot_product = ba1 * bc1 + ba2 * bc2
    
    ba_length = F.sqrt((ba1 ** 2) + (ba2 ** 2))
    bc_length = F.sqrt((bc1 ** 2) + (bc2 ** 2))
    
    angle = F.acos(dot_product / (ba_length * bc_length))
    
    df = df.withColumns({
        'angle_radians': angle,
        'angle_degrees': F.degrees(angle),
    })
    df.show()
    
    # +---+---+---+---+---+---+------------------+-----------------+
    # | a1| a2| b1| b2| c1| c2|     angle_radians|    angle_degrees|
    # +---+---+---+---+---+---+------------------+-----------------+
    # |  0|  1|  0|  0|  1|  1|0.7853981633974484|45.00000000000001|
    # +---+---+---+---+---+---+------------------+-----------------+