pythonapache-sparkpysparkapache-spark-sql

PySpark: How to apply a Python UDF to PySpark DataFrame columns?


I have a PySpark DataFrame with two sets of latitude, longitude coordinates. I am trying to calculate the Haversine distance between each set of coordinates for a given row. I am using the following haversine() that I found online. The problem is that it cannot be applied to columns, or at least I do not know the syntax to do so. Can someone share the syntax or point out a better solution?

from math import radians, cos, sin, asin, sqrt

def haversine(lat1, lon1, lat2, lon2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    # Radius of earth in miles is 3,963; 5280 ft in 1 mile
    ft = 3963 * 5280 * c
    return ft

I know the haversine() function above works because I tested it using some lat/lon coordinates from my dataframe and got sensible results:

haversine(-85.8059, 38.250134, 
          -85.805122, 38.250098)
284.1302325439314

When I replace sample coordinates with column names corresponding to lat/lons in my PySpark dataframe, I get an error. I have tried the following code in an attempt to create a new column containing the calculated Haversine distance as measured in feet:

df.select('id', 'p1_longitude', 'p1_latitude', 'p2_lon', 'p2_lat').withColumn('haversine_dist', 
                           haversine(df['p1_latitude'],
                                    df['p1_longitude'],
                                    df['p2_lat'],
                                    df['p2_lon']))
.show()

but I get the error:

must be real number, not Column Traceback (most recent call last):
File "", line 8, in haversine TypeError: must be real number, not Column

This indicates to me that I must somehow iteratively apply my haversine function to each row of my PySpark DataFrame, but I'm not sure if that guess is correct and even if so, I don't know how to do it. As an aside, my lat/lons are float types.


Solution

  • Don't use UDF when you can use Spark built-in functions as they are generally less performant.

    Here is a solution using only Spark SQL functions that do the same as your function :

    from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos
    
    df.withColumn("dlon", radians(col("p2_lon")) - radians(col("p1_longitude"))) \
      .withColumn("dlat", radians(col("p2_lat")) - radians(col("p1_latitude"))) \
      .withColumn("haversine_dist", asin(sqrt(
                                             sin(col("dlat") / 2) ** 2 + cos(radians(col("p1_latitude")))
                                             * cos(radians(col("p2_lat"))) * sin(col("dlon") / 2) ** 2
                                             )
                                        ) * 2 * 3963 * 5280) \
      .drop("dlon", "dlat")\
      .show(truncate=False)
    

    Gives:

    +-----------+------------+----------+---------+------------------+
    |p1_latitude|p1_longitude|p2_lat    |p2_lon   |haversine_dist    |
    +-----------+------------+----------+---------+------------------+
    |-85.8059   |38.250134   |-85.805122|38.250098|284.13023254857814|
    +-----------+------------+----------+---------+------------------+
    

    You can find available Spark builtin functions here.