I need to explode date range into multiple rows with new start and end dates so the exploded rows have a range of one day only. I also need a new unique userId and need to retain start times and end times.
Input dataframe
userId | Start_Date_Time | End_Date_Time |
---|---|---|
a | 2022-12-10 08:00:00 | 2022-12-15 17:00:00 |
b | 2022-12-06 05:00:00 | 2022-12-07 18:00:00 |
Desired Output:
userId | userIdNew | Start_Date_Time | End_Date_Time | Start_Date_Time_New | End_Date_Time_New |
---|---|---|---|---|---|
a | a1 | 2022-12-10 08:00:00 | 2022-12-15 17:00:00 | 2022-12-10 08:00:00 | 2022-12-11 17:00:00 |
a | a2 | 2022-12-10 08:00:00 | 2022-12-15 17:00:00 | 2022-12-11 08:00:00 | 2022-12-12 17:00:00 |
a | a3 | 2022-12-10 08:00:00 | 2022-12-15 17:00:00 | 2022-12-12 08:00:00 | 2022-12-13 17:00:00 |
a | a4 | 2022-12-10 08:00:00 | 2022-12-15 17:00:00 | 2022-12-13 08:00:00 | 2022-12-14 17:00:00 |
a | a5 | 2022-12-10 08:00:00 | 2022-12-15 17:00:00 | 2022-12-14 08:00:00 | 2022-12-15 17:00:00 |
b | b1 | 2022-12-06 05:00:00 | 2022-12-07 18:00:00 | 2022-12-06 05:00:00 | 2022-12-07 18:00:00 |
The 'F.sequence' function will make an array of values between two given columns. because it will include the last value too ([1, 3] -> [1, 2, 3]) you need to reduce endDate by 1 day. after exploding the array you have your start dates and by adding 1 day to it you can have end dates too. for new user id you can use row_number and contacting it with previous id.
from pyspark.sql import Window as W
from pyspark.sql import functions as F
(
df
.withColumn('startDate', F.col('startDate').astype('date'))
.withColumn('endDate', F.col('endDate').astype('date'))
.withColumn(
'timeseries',
F.sequence(
F.col('startDate'),
F.date_add(F.col('endDate'), -1),
F.expr("INTERVAL 1 DAY")))
.select(
F.col('id'),
F.col('startDate'),
F.col('endDate'),
F.explode('timeseries').alias('newStartDate'),)
.withColumn('newEndDate', F.date_add(F.col('newStartDate'), 1))
.withColumn('rowNumber', F.row_number().over(W.partitionBy('id').orderBy('newStartDate')))
.withColumn('newId', F.concat('id', 'rowNumber'))
.drop('rowNumber')
).show()