Search code examples
pyspark

Pyspark: Split duration as duration per day


I have duration data like this:

+---------+-------------------+-------------------+---------------------+
|       id|         start_time|             ts_utc|stream_duration_total|
+---------+-------------------+-------------------+---------------------+
|       33|2022-07-03 00:07:20|2022-07-06 11:10:34|               298994|
+---------+-------------------+-------------------+---------------------+

What I need is data with one row per day for all days within the stream duration (from start_time until start_time + stream_duration_total) with stream_duration_total split among the days.

Example:

id start_time          ts_utc               dt  stream_duration_total 
33 2022-07-03 00:07:20 2022-07-06 11:10:34  2022-07-03     85959
33 2022-07-03 00:07:20 2022-07-06 11:10:34  2022-07-04     86400
33 2022-07-03 00:07:20 2022-07-06 11:10:34  2022-07-05     86400
33 2022-07-03 00:07:20 2022-07-06 11:10:34  2022-07-06     40235

Solution

  • A solution of pyspark

    # start of udf parse_date_range
    @udf(returnType=ArrayType(DateType()))
    def parse_date_range(start_date, end_date):
        dates = []
        
        for i in range((end_date - start_date).days + 1):
            dates.append( start_date + timedelta(days=i) )
        
        return dates
    # end of udf parse_date_range
    
    df = spark.createDataFrame([(33, "2022-07-03 00:07:20","2022-07-06 11:10:34")], ['id', 'start_time', 'ts_utc'])\
            .withColumn("start_time", col("start_time").cast("timestamp"))\
            .withColumn("ts_utc", col("ts_utc").cast("timestamp"))
    df.printSchema()
    # root
    #  |-- id: long (nullable = true)
    #  |-- start_time: timestamp (nullable = true)
    #  |-- ts_utc: timestamp (nullable = true)
    
    df = df.withColumn("split_date", parse_date_range(col("start_time"), col("ts_utc")))\
           .withColumn("split_date", explode(col("split_date")))\
        .withColumn("stream_duration_total", when(col("start_time").cast("date") == col("split_date"), 86399 + col("split_date").cast("timestamp").cast("long") - col("start_time").cast("long"))
                                            .when(col("ts_utc").cast("date") == col("split_date"), 1 + col("ts_utc").cast("long") - col("split_date").cast("timestamp").cast("long"))
                                            .otherwise(lit(86400)))
    
    df.show()
    +---+-------------------+-------------------+----------+---------------------+
    | id|         start_time|             ts_utc|split_date|stream_duration_total|
    +---+-------------------+-------------------+----------+---------------------+
    | 33|2022-07-03 00:07:20|2022-07-06 11:10:34|2022-07-03|                85959|
    | 33|2022-07-03 00:07:20|2022-07-06 11:10:34|2022-07-04|                86400|
    | 33|2022-07-03 00:07:20|2022-07-06 11:10:34|2022-07-05|                86400|
    | 33|2022-07-03 00:07:20|2022-07-06 11:10:34|2022-07-06|                40235|
    +---+-------------------+-------------------+----------+---------------------+