Search code examples
apache-sparkpysparkapache-spark-sqldistance

Joining rows from two dataframes with the closest point


Hi I am kinda new to spark and I am not sure how to approach this.

I have 2 tables (way smaller for easier explanation):

A:Weather Data

B:travel data

I need to join these tables by finding the closest station when the trip started in the same date and do the same when the trip ended. so at the end I have all the weather data from the station at the time the trip started and when the trip finished, and just one row for each trip with the data from the closest weather station.

i have done something similar with geopandas and udf but it was way easier because i was looking for an interception. like this:

def find_state_gps(lat, long):
df = gdf_states.apply(lambda x: x["NAME"] if x["geometry"].intersects(Point(long,lat)) else None, axis =1)
idx = df.first_valid_index()
value = df.loc[idx] if idx is not None else "Not in USA territory"
return(value)

state_gps = udf(find_state_gps, StringType())

I am not sure how to handle the logic this time.

i also tried doing this query with no luck.

query = "SELECT STATION,\
    NAME,\
    LATITUDE,\
    LONGITUDE,\
    AWND,\
    p.id_trip,\
    p.Latitude,\
    p.Longitude,\
    p.startDate,\
      Abs(p.latitude-LATITUDE)**2 + Abs(p.Longitude-LONGITUDE)**2\
      AS dd\
FROM df2\
CROSS JOIN (\
SELECT id AS id_trip,\
        station_id,\
        Latitude,\
        Longitude,\
        startDate\
 FROM df1\
) AS p ON 1=1\
 ORDER BY dd"

and got the following error: ParseException: mismatched input '2' expecting {, ';'}(line 1, pos 189)

At the end i want something like this without repeated trips.

id started_date finish_date finished weather_station_start weather_station_end more columns about weather for starting and ending trip locations
1 bim baz bim baz bim bim
2 bim baz bim baz bim bim

I really appreciate your help guys.


Solution

  • I changed your sample data a bit because all stations have the same coordinates:

    travel_data  = spark.createDataFrame(
      [
    ('0','2013-06-01','00:00:01','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
    ,('1','2013-06-01','00:00:08','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
    ,('2','2013-06-01','00:00:44','-73.99595065','40.69512845','40.69512845','-73.99595065','2013-06-01')
    ,('3','2013-06-01','00:01:04','-73.98758561','40.73524276','40.6917823','-73.9737299','2013-06-01')
    ,('4','2013-06-01','00:01:22','-74.01677685','40.70569254','40.68926942','-73.98912867','2013-06-01')
      ], ['id','startDate','startTime','Longitude','Latitude','end station latitude','end station longitude','stopdate']
    )
    
    weather_data  = spark.createDataFrame(
      [
     ('USINYWC0003','WHITE PLAINS 3.1 NNW 3, NY US','41.0639','-73.7722','71','2013-06-01','','','','','')
    ,('USINYWC0002','WHITE PLAINS 3.1 NNW 2, NY US','41.0638','-73.7723','71','2013-06-02','','','','','')
    ,('USINYWC0001','WHITE PLAINS 3.1 NNW 1, NY US','41.0635','-73.7724','71','2013-06-03','','','','','')
      ], ['STATION','NAME','LATITUDE','LONGITUDE','ELEVATION','DATE','AWND','AWND ATTRIBUTES','DAPR','DAPR ATTRIBUTES','DASE']
    )
    
    +---+----------+---------+------------+-----------+--------------------+---------------------+----------+
    | id| startDate|startTime|   Longitude|   Latitude|end station latitude|end station longitude|  stopdate|
    +---+----------+---------+------------+-----------+--------------------+---------------------+----------+
    |  0|2013-06-01| 00:00:01|-73.98915076| 40.7423543|         40.74317449|         -74.00366443|2013-06-01|
    |  1|2013-06-01| 00:00:08|-73.98915076| 40.7423543|         40.74317449|         -74.00366443|2013-06-01|
    |  2|2013-06-01| 00:00:44|-73.99595065|40.69512845|         40.69512845|         -73.99595065|2013-06-01|
    |  3|2013-06-01| 00:01:04|-73.98758561|40.73524276|          40.6917823|          -73.9737299|2013-06-01|
    |  4|2013-06-01| 00:01:22|-74.01677685|40.70569254|         40.68926942|         -73.98912867|2013-06-01|
    +---+----------+---------+------------+-----------+--------------------+---------------------+----------+
    
    +-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
    |    STATION|                NAME|LATITUDE|LONGITUDE|ELEVATION|      DATE|AWND|AWND ATTRIBUTES|DAPR|DAPR ATTRIBUTES|DASE|
    +-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
    |USINYWC0003|WHITE PLAINS 3.1 ...| 41.0639| -73.7722|       71|2013-06-01|    |               |    |               |    |
    |USINYWC0002|WHITE PLAINS 3.1 ...| 41.0638| -73.7723|       71|2013-06-02|    |               |    |               |    |
    |USINYWC0001|WHITE PLAINS 3.1 ...| 41.0635| -73.7724|       71|2013-06-03|    |               |    |               |    |
    +-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
    

    Then, crossjoin the two dataframes in order to calculate the haversine distance between the start/end point and all stations. Not the best solution using a crossjoin, but depending on the size of your data it might be the easiest way

    
    from pyspark.sql.types import *
    from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos, max, min
    from pyspark.sql import Window as W
    
    join_df = travel_data\
        .crossJoin(weather_data.select('NAME',col('LATITUDE').alias('st_LAT'),col('LONGITUDE').alias('st_LON'), 'AWND')) \
        .withColumn("dlon_start", radians(col("st_LON")) - radians(col("Longitude"))) \
        .withColumn("dlat_start", radians(col("st_LAT")) - radians(col("Latitude"))) \
        .withColumn("haversine_dist_start", asin(sqrt(
                                             sin(col("dlat_start") / 2) ** 2 + cos(radians(col("Latitude")))
                                             * cos(radians(col("st_LAT"))) * sin(col("dlon_start") / 2) ** 2
                                             )
                                        ) * 2 * 3963 * 5280)\
        .withColumn("dlon_end", radians(col("st_LON")) - radians(col("end station longitude"))) \
        .withColumn("dlat_end", radians(col("st_LAT")) - radians(col("end station latitude"))) \
        .withColumn("haversine_dist_end", asin(sqrt(
                                             sin(col("dlat_end") / 2) ** 2 + cos(radians(col("Latitude")))
                                             * cos(radians(col("st_LAT"))) * sin(col("dlon_end") / 2) ** 2
                                             )
                                        ) * 2 * 3963 * 5280)\
        .drop('dlon_start','dlat_start','dlon_end','dlat_end')
    

    Finally, using window functions to pick the closest station from start point (result1) and closest station from end point (result2)

    W = W.partitionBy("id")
    
    result1 = join_df\
        .withColumn("min_dist_start", min('haversine_dist_start').over(W))\
        .filter(col("min_dist_start") == col('haversine_dist_start'))\
        .select('id',col('startDate').alias('started_date'),col('stopdate').alias('finish_date'),col('NAME').alias('weather_station_start'),col('Latitude').alias('Latitude_start'),col('Longitude').alias('Longitude_start'))
    
    
    
    result2 = join_df\
        .withColumn("min_dist_end", min('haversine_dist_end').over(W))\
        .filter(col("min_dist_end") == col('haversine_dist_end'))\
        .select('id', col('NAME').alias('weather_station_end'))
    
    final = result1.join(result2, 'id', 'left')
    
    final.show()
    

    Not sure of wich columns you want on the output but hope this give you some insights output:

    +---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
    |id |started_date|finish_date|weather_station_start        |Latitude_start|Longitude_start|weather_station_end          |
    +---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
    |0  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543    |-73.98915076   |WHITE PLAINS 3.1 NNW 1, NY US|
    |1  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543    |-73.98915076   |WHITE PLAINS 3.1 NNW 1, NY US|
    |2  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.69512845   |-73.99595065   |WHITE PLAINS 3.1 NNW 1, NY US|
    |3  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.73524276   |-73.98758561   |WHITE PLAINS 3.1 NNW 1, NY US|
    |4  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.70569254   |-74.01677685   |WHITE PLAINS 3.1 NNW 1, NY US|
    +---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+