Search code examples
pythonpysparkrowexplodedate-range

PySpark explode date range into rows


I need to explode date range into multiple rows with new start and end dates so the exploded rows have a range of one day only. I also need a new unique userId and need to retain start times and end times.

Input dataframe

userId Start_Date_Time End_Date_Time
a 2022-12-10 08:00:00 2022-12-15 17:00:00
b 2022-12-06 05:00:00 2022-12-07 18:00:00

Desired Output:

userId userIdNew Start_Date_Time End_Date_Time Start_Date_Time_New End_Date_Time_New
a a1 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-10 08:00:00 2022-12-11 17:00:00
a a2 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-11 08:00:00 2022-12-12 17:00:00
a a3 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-12 08:00:00 2022-12-13 17:00:00
a a4 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-13 08:00:00 2022-12-14 17:00:00
a a5 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-14 08:00:00 2022-12-15 17:00:00
b b1 2022-12-06 05:00:00 2022-12-07 18:00:00 2022-12-06 05:00:00 2022-12-07 18:00:00

Solution

  • The 'F.sequence' function will make an array of values between two given columns. because it will include the last value too ([1, 3] -> [1, 2, 3]) you need to reduce endDate by 1 day. after exploding the array you have your start dates and by adding 1 day to it you can have end dates too. for new user id you can use row_number and contacting it with previous id.

    from pyspark.sql import Window as W
    from pyspark.sql import functions as F
    (
        df
        .withColumn('startDate', F.col('startDate').astype('date'))
        .withColumn('endDate', F.col('endDate').astype('date'))
        .withColumn(
            'timeseries', 
            F.sequence(
                F.col('startDate'), 
                F.date_add(F.col('endDate'), -1), 
                F.expr("INTERVAL 1 DAY")))
        .select(
            F.col('id'),
            F.col('startDate'),
            F.col('endDate'),
            F.explode('timeseries').alias('newStartDate'),)
        .withColumn('newEndDate', F.date_add(F.col('newStartDate'), 1))
        .withColumn('rowNumber', F.row_number().over(W.partitionBy('id').orderBy('newStartDate')))
        .withColumn('newId', F.concat('id', 'rowNumber'))
        .drop('rowNumber')
    ).show()