I have the following dataframe in pyspark:
+------------------- +-------------------+---------+-----------------------+-----------+
|device_id |order_creation_time|order_id |status_check_time |status_code|
+--------------------+-------------------+---------+-----------------------+-----------+
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:33.858|200 |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:13.1 |200 |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:57.682|200 |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676|200 |
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:21.293|200 |
+--------------------+-------------------+---------+-----------------------+-----------+
I need to get the time of the status_check_time
immediately preceding, and immediately after the order_creation_time
.
The order_creation_time
column will be always constant across the same order_id
(so, each order_id
has only 1 order_creation_time
)
In this case, the output should be:
+------------------- +-------------------+---------+---------------------------+-----------------------+
|device_id |order_creation_time|order_id |previous_status_check_time |next_status_check_time |
+--------------------+-------------------+---------+---------------------------+-----------------------+
|67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676 |2022-11-26 23:54:57.682|
+--------------------+-------------------+---------+---------------------------+-----------------------+
I was trying to use lag
and lead
functions, but I'm not getting the desired output:
ss = (
SparkSession.
builder.
appName("test").
master("local[2]").
getOrCreate()
)
data = [
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:55:33.858", "status_code": 200},
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:55:13.1" , "status_code": 200},
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:57.682", "status_code": 200},
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:36.676", "status_code": 200},
{"device_id": "67a-05df-4ca5-af6ajn", "order_creation_time": "2022-11-26 23:54:41", "order_id": "105785113", "status_check_time":"2022-11-26 23:54:21.293", "status_code": 200}
]
df = ss.createDataFrame(data)
windowSpec = Window.partitionBy("device_id").orderBy("status_check_time")
(
df.withColumn(
"previous_status_check_time", lag("status_check_time").over(windowSpec)
).withColumn(
"next_status_check_time", lead("status_check_time").over(windowSpec)
).show(truncate=False)
)
Any ideas of how to fix this??
We can calculate the difference between the two timestamps in seconds and retain the ones that are the closest negative and closest positive.
data_sdf. \
withColumn('ts_diff', func.col('status_check_time').cast('long') - func.col('order_creation_time').cast('long')). \
groupBy([k for k in data_sdf.columns if k != 'status_check_time']). \
agg(func.max(func.when(func.col('ts_diff') < 0, func.struct('ts_diff', 'status_check_time'))).status_check_time.alias('previous_status_check_time'),
func.min(func.when(func.col('ts_diff') >= 0, func.struct('ts_diff', 'status_check_time'))).status_check_time.alias('next_status_check_time')
). \
show(truncate=False)
# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
# |device_id |order_creation_time|order_id |status_code|previous_status_check_time|next_status_check_time |
# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|200 |2022-11-26 23:54:36.676 |2022-11-26 23:54:57.682|
# +--------------------+-------------------+---------+-----------+--------------------------+-----------------------+
The timestamp difference results in the following
# +--------------------+-------------------+---------+-----------------------+-----------+-------+
# |device_id |order_creation_time|order_id |status_check_time |status_code|ts_diff|
# +--------------------+-------------------+---------+-----------------------+-----------+-------+
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:33.858|200 |-52 |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:55:13.1 |200 |-32 |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:57.682|200 |-16 |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:36.676|200 |5 |
# |67a-05df-4ca5-af6ajn|2022-11-26 23:54:41|105785113|2022-11-26 23:54:21.293|200 |20 |
# +--------------------+-------------------+---------+-----------------------+-----------+-------+