Search code examples
apache-sparkpysparkapache-spark-sqluser-defined-functionsgeopy

Calculate the geographical distance in pyspark dataframe


My dataframe:

DF = spark.createDataFrame([[114.038696,  22.5315,  114.047302, 22.531799], [ 114.027901, 22.5228, 114.026299, 22.5238], [ 114.026299, 22.5238,114.024597,22.5271], [114.024597,  22.5271,114.024696,22.527201]], list('ABCD'))
DF.show()
+----------+-------+----------+---------+
|         A|      B|         C|        D|
+----------+-------+----------+---------+
|114.038696|22.5315|114.047302|22.531799|
|114.027901|22.5228|114.026299|  22.5238|
|114.026299|22.5238|114.024597|  22.5271|
|114.024597|22.5271|114.024696|22.527201|
+----------+-------+----------+---------+

(A, B) & (C, D) are coordinates of two points;

column A & column C are longitude;

column B & column D are latitude;

I want to calculate the geographical distance between the two points.

I try to:

from geopy.distance import geodesic
DF = DF.withColumn('Lengths/m', geodesic((['B'],['A']), (['D'],['C'])).m)

Then I get the error:

TypeError: float() argument must be a string or a number, not 'list'

What should I do differently to successfully calculate the geographical distance?


Solution

  • You need to define a custom user-defined-function:

    from geopy.distance import geodesic
    import pyspark.sql.functions as F
    
    @F.udf(returnType=FloatType())
    def geodesic_udf(a, b):
        return geodesic(a, b).m
    
    
    DF = DF.withColumn('Lengths/m', geodesic_udf(F.array("B", "A"), F.array("D", "C")))
    
    DF.show()
    #+----------+-------+----------+---------+---------+
    #|A         |B      |C         |D        |Lengths/m|
    #+----------+-------+----------+---------+---------+
    #|114.038696|22.5315|114.047302|22.531799|885.94244|
    #|114.027901|22.5228|114.026299|22.5238  |198.55937|
    #|114.026299|22.5238|114.024597|22.5271  |405.21692|
    #|114.024597|22.5271|114.024696|22.527201|15.126849|
    #+----------+-------+----------+---------+---------+