Search code examples
pythondataframepysparktime-seriesaws-glue

During aggregation count the longest date streak - using pyspark


imagine a table:

PersonID Date HasDoneWorkout
A 31-01-2001 1
A 01-02-2001 1
A 02-02-2001 1
A 03-02-2001 0
A 04-02-2001 1
B 02-02-2001 1

I would like to create a pyspark aggregation function which will count how many days in a row a person has done workout. If person has more than one consecutive record - pick the longest one.

Expected output:

PersonID HasDoneWorkout
A 3
B 1

Since I didn't find any solution using pyspark - I tried to adopt a pandas approach. But failed to translate it into pyspark.


Solution

  • Step by step solution

    Create a window specification to group the DataFrame by PersonID and order by Date then using the to_date function to parse the strings as date type.

    W = Window.partitionBy('PersonID').orderBy('Date')
    df1 = df.withColumn('Date', F.to_date('Date', format='dd-MM-yyyy'))
    
    # df1.show()
    # +--------+----------+--------------+
    # |PersonID|      Date|HasDoneWorkout|
    # +--------+----------+--------------+
    # |       A|2001-01-31|             1|
    # |       A|2001-02-01|             1|
    # |       A|2001-02-02|             1|
    # |       A|2001-02-03|             0|
    # |       A|2001-02-04|             1|
    # |       B|2001-02-02|             1|
    # +--------+----------+--------------+
    

    Calculate the difference between the Dates in the previous and current row to flag the rows where dates are consecutive

    diff = F.datediff('Date', F.lag('Date').over(W))
    df1 = df1.withColumn('is_consecutive_day', F.coalesce(diff, F.lit(0)) == 1)
    
    # df1.show()
    # +--------+----------+--------------+------------------+
    # |PersonID|      Date|HasDoneWorkout|is_consecutive_day|
    # +--------+----------+--------------+------------------+
    # |       A|2001-01-31|             1|             false|
    # |       A|2001-02-01|             1|              true|
    # |       A|2001-02-02|             1|              true|
    # |       A|2001-02-03|             0|              true|
    # |       A|2001-02-04|             1|              true|
    # |       B|2001-02-02|             1|             false|
    # +--------+----------+--------------+------------------+
    

    Create a boolean column to identify the rows with consecutive dates and the rows where the person has done the workout.

    df1 = df1.withColumn('is_workout_on_consecutive_day', F.col('is_consecutive_day') & (F.col('HasDoneWorkout') == 1))
    
    # df1.show()
    # +--------+----------+--------------+------------------+-----------------------------+
    # |PersonID|      Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|
    # +--------+----------+--------------+------------------+-----------------------------+
    # |       A|2001-01-31|             1|             false|                        false|
    # |       A|2001-02-01|             1|              true|                         true|
    # |       A|2001-02-02|             1|              true|                         true|
    # |       A|2001-02-03|             0|              true|                        false|
    # |       A|2001-02-04|             1|              true|                         true|
    # |       B|2001-02-02|             1|             false|                        false|
    # +--------+----------+--------------+------------------+-----------------------------+
    

    Cumulative sum over the inverted condition is_workout_on_consecutive_day to distinguish between different groups of rows where person has done workout consecutively

    df1 = df1.withColumn('groups', F.sum((~F.col('is_workout_on_consecutive_day')).cast('int')).over(W))
    
    
    # df1.show()
    # +--------+----------+--------------+------------------+-----------------------------+------+
    # |PersonID|      Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|groups|
    # +--------+----------+--------------+------------------+-----------------------------+------+
    # |       A|2001-01-31|             1|             false|                        false|     1|
    # |       A|2001-02-01|             1|              true|                         true|     1|
    # |       A|2001-02-02|             1|              true|                         true|     1|
    # |       A|2001-02-03|             0|              true|                        false|     2|
    # |       A|2001-02-04|             1|              true|                         true|     2|
    # |       B|2001-02-02|             1|             false|                        false|     1|
    # +--------+----------+--------------+------------------+-----------------------------+------+
    

    Group the dataframe by PersonID and groups and aggregate HasDoneWorkout with sum to get the counts of all consecutive streaks

    df1 = df1.groupBy('PersonID', 'groups').agg(F.sum('HasDoneWorkout').alias('streaks'))
    
    # df1.show()
    # +--------+------+-------+
    # |PersonID|groups|streaks|
    # +--------+------+-------+
    # |       A|     1|      3|
    # |       A|     2|      1|
    # |       B|     1|      1|
    # +--------+------+-------+
    

    Again group the dataframe by PersonID and aggregate to find the maximum continuous streak

    df1 = df1.groupBy('PersonID').agg(F.max('streaks').alias('streaks'))
    
    # df1.show()
    # +--------+-------+
    # |PersonID|streaks|
    # +--------+-------+
    # |       A|      3|
    # |       B|      1|
    # +--------+-------+