Search code examples
dataframeapache-sparkpysparkstreamingspark-streaming

How to filter and select columns and merge streaming dataframes in spark?


I have a streaming dataframe and I am not sure what the best way is to solve this issue

ID lattitude longitude
A 28 30
B 40 52

Transform to:

A B. Distance
(28,30) (40,52) calculate distance

I need to transform it to this and add a distance column in which I pass the coordinates.

I am thinking about producing 2 data streams that are filtered with all the A coordinates and B coordinates. I would then A.join(B).withColumn(distance) and stream the output. Is this the way to go about solving this problem?

Is there a way I could pivot without aggregation to readstream data into the format needed which could be faster than making 2 streaming dataframes filtered and merging them?

Can I add an array column of coordinates in a streaming dataset?


Solution

  • I am not sure how performant this will be, but you can use pivot to force rows of the ID column to become new columns and sum the individual latitude and longitude as a way to obtain the value itself (since there is no F.identity). This will get you the following result:

    streaming_df.groupby().pivot('ID').agg(
        F.sum('latitude').alias('latitude'),
        F.sum('longitude').alias('longitude')
    )
    
    +----------+-----------+----------+-----------+
    |A_latitude|A_longitude|B_latitude|B_longitude|
    +----------+-----------+----------+-----------+
    |        28|         30|        40|         52|
    +----------+-----------+----------+-----------+
    

    Then you can use F.struct to create columns A and B using the latitude and longitude columns:

    streaming_df.groupby().pivot('ID').agg(
        F.sum('latitude').alias('latitude'),
        F.sum('longitude').alias('longitude')
    ).withColumn(
        'A', F.struct(F.col('A_latitude'), F.col('A_longitude'))
    ).withColumn(
        'B', F.struct(F.col('B_latitude'), F.col('B_longitude'))
    )
    
    +----------+-----------+----------+-----------+--------+--------+
    |A_latitude|A_longitude|B_latitude|B_longitude|       A|       B|
    +----------+-----------+----------+-----------+--------+--------+
    |        28|         30|        40|         52|{28, 30}|{40, 52}|
    +----------+-----------+----------+-----------+--------+--------+
    

    The last step is to use a udf to calculate geographic distance, which has been answered here. Putting this all together:

    import pyspark.sql.functions as F
    from pyspark.sql.types import FloatType
    from geopy.distance import geodesic
    
    @F.udf(returnType=FloatType())
    def geodesic_udf(a, b):
        return geodesic(a, b).m
    
    streaming_df.groupby().pivot('ID').agg(
        F.sum('latitude').alias('latitude'),
        F.sum('longitude').alias('longitude')
    ).withColumn(
        'A', F.struct(F.col('A_latitude'), F.col('A_longitude'))
    ).withColumn(
        'B', F.struct(F.col('B_latitude'), F.col('B_longitude'))
    ).withColumn(
        'distance', geodesic_udf(F.array('B.B_longitude','B.B_latitude'), F.array('A.A_longitude','A.A_latitude'))
    ).select(
        'A','B','distance'
    )
    
    +--------+--------+---------+
    |       A|       B| distance|
    +--------+--------+---------+
    |{28, 30}|{40, 52}|2635478.5|
    +--------+--------+---------+
    

    EDIT: When I answered your question, I let pyspark infer the datatype of each column, but I also tried to more closely reproduce the schema for your streaming dataframe by specifying the column types:

    streaming_df = spark.createDataFrame(
        [
            ("A", 28., 30.),
            ("B", 40., 52.),
        ],
        StructType([
            StructField("ID", StringType(), True),
            StructField("latitude", DoubleType(), True),
            StructField("longitude", DoubleType(), True),
        ])
    )
    
    streaming_df.printSchema()
    root
     |-- ID: string (nullable = true)
     |-- latitude: double (nullable = true)
     |-- longitude: double (nullable = true)
    

    The end result is still the same:

    +------------+------------+---------+
    |           A|           B| distance|
    +------------+------------+---------+
    |{28.0, 30.0}|{40.0, 52.0}|2635478.5|
    +------------+------------+---------+