Search code examples
pythonapache-sparkpysparkwindow-functionssliding-window

Pyspark count on any sliding window for any ID


I have a data frame of customer digital visit over time in the form:

|cust_id|datetime|
|1|2020-08-15 15:20|
|1|2020-08-15 16:20|
|1|2020-08-17 12:20|
|1|2020-08-19 14:20|
|1|2020-08-23 09:20|
|2|2020-08-24 08:00|

I'd like to pick out strong signals, as in : customers who visit at least 3 times in 5 days.

My initial thought is that we have to compute ALL sliding window for each customer.

In this example, let's take cust1 :

  • 5-day window starting 2020-08-15, ending 2020-08-19, total visit is 4

  • 5-day window starting 2020-08-16, ending 2020-08-20, total visit is 2

  • 5-day window starting 2020-08-17, ending 2020-08-21, total visit is 2

etc.

Max count of all sliding window is 4. Therefore cust1 fits the criteria "having visited at least 3 times in 5 days"

This seems to be a costly operation.

How would you implement this efficiently ? Any other idea is welcome.


Solution

  • You can convert the datetime column to long and pass in the number of seconds equivalent to 5 days in the rangeBetween() function.

    from pyspark.sql.functions import *
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    df = df.withColumn("date_long", to_date(substring(col("datetime"),0,10), "yyyy-MM-dd"))\
            .withColumn("date_long", unix_timestamp('date_long', 'yyyy-MM-dd'))
    
    days = lambda i: i * 86400 
    w = (Window.partitionBy('cust_id').orderBy("date_long").rangeBetween(0,days(5)))
    
    df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long').show()
    +-------+----------------+-----------+                                          
    |cust_id|        datetime|5_day_visit|
    +-------+----------------+-----------+
    |      1|2020-08-15 15:20|          4|
    |      1|2020-08-15 16:20|          4|
    |      1|2020-08-17 12:20|          2|
    |      1|2020-08-19 14:20|          2|
    |      1|2020-08-23 09:20|          1|
    |      2|2020-08-24 08:00|          1|
    +-------+----------------+-----------+
    

    To get the maximum number of 5-day visits for each customer, you can do:

    df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long')\
        .groupBy('cust_id').agg(F.max('5_day_visit').alias('max_5_day_visits')).show()
    +-------+----------------+                                                      
    |cust_id|max_5_day_visits|
    +-------+----------------+
    |      1|               4|
    |      2|               1|
    +-------+----------------+