Search code examples
pysparkapache-spark-sqllag

Pyspark windows function: preceding and following event


I have the following dataframe in pyspark:

+------------------- +-------------------+---------+-----------------------+-----------+
|device_id           |order_creation_time|order_id |status_check_time      |status_code|
+--------------------+-------------------+---------+-----------------------+-----------+
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:33.858|200        |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:13.1  |200        |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:57.682|200        |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676|200        |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:21.293|200        |
+--------------------+-------------------+---------+-----------------------+-----------+

I need to get the time of the status_check_time immediately preceding, and immediately after the order_creation_time.

The order_creation_time column will be always constant across the same order_id (so, each order_id has only 1 order_creation_time)

In this case, the output should be:

+------------------- +-------------------+---------+---------------------------+-----------------------+
|device_id           |order_creation_time|order_id |previous_status_check_time |next_status_check_time |
+--------------------+-------------------+---------+---------------------------+-----------------------+
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676    |2022-11-26 23:54:57.682|
+--------------------+-------------------+---------+---------------------------+-----------------------+

I was trying to use lag and lead functions, but I'm not getting the desired output:

ss = (
    SparkSession.
    builder.
    appName("test").
    master("local[2]").
    getOrCreate()
)
data = [
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:55:33.858", "status_code": 200},
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:55:13.1"  , "status_code": 200},
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:57.682", "status_code": 200},
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:36.676", "status_code": 200},
    {"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:21.293", "status_code": 200}  
]
     
df = ss.createDataFrame(data)

windowSpec  = Window.partitionBy("device_id").orderBy("status_check_time")

(
   df.withColumn(
    "previous_status_check_time", lag("status_check_time").over(windowSpec)
   ).withColumn(
    "next_status_check_time", lead("status_check_time").over(windowSpec)
   ).show(truncate=False) 
)

Any ideas of how to fix this??


Solution

  • We can calculate the difference between the two timestamps in seconds and retain the ones that are the closest negative and closest positive.

    data_sdf. \
        withColumn('ts_diff', func.col('status_check_time').cast('long') - func.col('order_creation_time').cast('long')). \
        groupBy([k for k in data_sdf.columns if k != 'status_check_time']). \
        agg(func.max(func.when(func.col('ts_diff') < 0, func.struct('ts_diff', 'status_check_time'))).status_check_time.alias('previous_status_check_time'),
            func.min(func.when(func.col('ts_diff') >= 0, func.struct('ts_diff', 'status_check_time'))).status_check_time.alias('next_status_check_time')
            ). \
        show(truncate=False)
    
    # +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
    # |device_id           |order_creation_time|order_id |status_code|previous_status_check_time|next_status_check_time |
    # +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
    # |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|200        |2022-11-26 23:54:36.676   |2022-11-26 23:54:57.682|
    # +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
    

    The timestamp difference results in the following

    # +--------------------+-------------------+---------+-----------------------+-----------+-------+
    # |device_id           |order_creation_time|order_id |status_check_time      |status_code|ts_diff|
    # +--------------------+-------------------+---------+-----------------------+-----------+-------+
    # |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:33.858|200        |-52    |
    # |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:13.1  |200        |-32    |
    # |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:57.682|200        |-16    |
    # |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676|200        |5      |
    # |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:21.293|200        |20     |
    # +--------------------+-------------------+---------+-----------------------+-----------+-------+