imagine a table:
PersonID | Date | HasDoneWorkout |
---|---|---|
A | 31-01-2001 | 1 |
A | 01-02-2001 | 1 |
A | 02-02-2001 | 1 |
A | 03-02-2001 | 0 |
A | 04-02-2001 | 1 |
B | 02-02-2001 | 1 |
I would like to create a pyspark aggregation function which will count how many days in a row a person has done workout. If person has more than one consecutive record - pick the longest one.
Expected output:
PersonID | HasDoneWorkout |
---|---|
A | 3 |
B | 1 |
Since I didn't find any solution using pyspark - I tried to adopt a pandas approach. But failed to translate it into pyspark.
Create a window specification to group the DataFrame by PersonID
and order by Date
then using the to_date
function to parse the strings as date type.
W = Window.partitionBy('PersonID').orderBy('Date')
df1 = df.withColumn('Date', F.to_date('Date', format='dd-MM-yyyy'))
# df1.show()
# +--------+----------+--------------+
# |PersonID| Date|HasDoneWorkout|
# +--------+----------+--------------+
# | A|2001-01-31| 1|
# | A|2001-02-01| 1|
# | A|2001-02-02| 1|
# | A|2001-02-03| 0|
# | A|2001-02-04| 1|
# | B|2001-02-02| 1|
# +--------+----------+--------------+
Calculate the difference between the Dates in the previous and current row to flag the rows where dates are consecutive
diff = F.datediff('Date', F.lag('Date').over(W))
df1 = df1.withColumn('is_consecutive_day', F.coalesce(diff, F.lit(0)) == 1)
# df1.show()
# +--------+----------+--------------+------------------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|
# +--------+----------+--------------+------------------+
# | A|2001-01-31| 1| false|
# | A|2001-02-01| 1| true|
# | A|2001-02-02| 1| true|
# | A|2001-02-03| 0| true|
# | A|2001-02-04| 1| true|
# | B|2001-02-02| 1| false|
# +--------+----------+--------------+------------------+
Create a boolean column to identify the rows with consecutive dates and the rows where the person has done the workout.
df1 = df1.withColumn('is_workout_on_consecutive_day', F.col('is_consecutive_day') & (F.col('HasDoneWorkout') == 1))
# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|
# +--------+----------+--------------+------------------+-----------------------------+
# | A|2001-01-31| 1| false| false|
# | A|2001-02-01| 1| true| true|
# | A|2001-02-02| 1| true| true|
# | A|2001-02-03| 0| true| false|
# | A|2001-02-04| 1| true| true|
# | B|2001-02-02| 1| false| false|
# +--------+----------+--------------+------------------+-----------------------------+
Cumulative sum over the inverted condition is_workout_on_consecutive_day
to distinguish between different groups of rows where person has done workout consecutively
df1 = df1.withColumn('groups', F.sum((~F.col('is_workout_on_consecutive_day')).cast('int')).over(W))
# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|groups|
# +--------+----------+--------------+------------------+-----------------------------+------+
# | A|2001-01-31| 1| false| false| 1|
# | A|2001-02-01| 1| true| true| 1|
# | A|2001-02-02| 1| true| true| 1|
# | A|2001-02-03| 0| true| false| 2|
# | A|2001-02-04| 1| true| true| 2|
# | B|2001-02-02| 1| false| false| 1|
# +--------+----------+--------------+------------------+-----------------------------+------+
Group the dataframe by PersonID
and groups
and aggregate HasDoneWorkout
with sum
to get the counts of all consecutive streaks
df1 = df1.groupBy('PersonID', 'groups').agg(F.sum('HasDoneWorkout').alias('streaks'))
# df1.show()
# +--------+------+-------+
# |PersonID|groups|streaks|
# +--------+------+-------+
# | A| 1| 3|
# | A| 2| 1|
# | B| 1| 1|
# +--------+------+-------+
Again group the dataframe by PersonID
and aggregate to find the maximum continuous streak
df1 = df1.groupBy('PersonID').agg(F.max('streaks').alias('streaks'))
# df1.show()
# +--------+-------+
# |PersonID|streaks|
# +--------+-------+
# | A| 3|
# | B| 1|
# +--------+-------+