Search code examples
pythonapache-sparkpysparkapache-spark-sqlgeopy

Filtering Spark SQL dataframe for distance


I have a Spark SQL DataDrame with columns latitude and longitude, I am trying to filter rows that fall below a threshold by calculating the distance to an input. My current code looks like. I am using geopy (great_circle) for calculating the distance between lat long pairs.

from geopy.distance import great_circle

point = (10, 20)
threshold = 10
filtered_df = df.filter(great_circle(point, (df.lat, df.lon)) < threshold)

When I run this code I get the following error

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions. 

I am confused on which part of the filter expression is wrong.


Solution

  • You cannot apply plain Python functions on a DataFrame. You have to use udf:

    from pyspark.sql.functions import udf
    
    @udf("float")
    def great_circle_udf(x, y):
        return great_circle(x, y).kilometers
    

    and apply it with columns

    from pyspark.sql.functions import lit, struct
    
    point = struct(lit(10), lit(20))
    df.filter(great_circle_udf(point, struct(df.lat, df.lon)) < threshold))
    

    Decorator syntax will work since 2.2, for earlier version you'll need standard udf call:

    udf(great_circle, FloatType())