Search code examples
mysqlapache-sparkpysparkapache-spark-sql

SQL/Pyspark - to check condition


I have a table with customer Id, Date of purchase

I wanted to check if there is 3 purchase in the 5 days windows period.

Note : Doesn't need to be consecutive

Cust Id Date O Purchase
11 09/11/2023
11 13/11/2023
11 15/11/2023
11 21/11/2023
11 23/11/2023
11 24/11/2023
12 16/11/2023
12 21/11/2023
12 25/11/2023
12 01/12/2023
12 03/12/2023
12 05/12/2023

Considering the table as the input for checking any 3 days out of 5 days window. For cust id 11 ->

  1. DOP 9/11/2023 + 5 days window period is 14/11/2023
    • On checking it has only 2 days 9/11/2023 & 13/11/2023 so it fails
  2. DOP 13/11/2023 + 5 days window period is 18/11/2023 - Fails(only 2 record)
  3. DOP 15/11/2023 + 5 days window period is 20/11/2023 - Fails(only 1 record)
  4. DOP 21/11/2023 + 5 days window period is 26/11/2023
    • On checking it has 3 days 21/11/2023,23/11/2023 & 24/11/2023 so it passes

On the overall level - CustomerID 11 passes the condition of having any 3 days purchase in the 5 days windows period

Please let me know if the question or clear.


Solution

  • You can look at this problem differently to simplify the solution, for each 3 consecutive purchases check if one of them is in the range of 5 days, and to simplify it even more after ordering the data by "Date O Purchase" you only need to check current row and current row -2 in that order (this means the last 3 purchase) and check if the dates of these 2 rows are within 5 days, here's a solution in PySpark:

    w = Window.partitionBy("Cust_Id").orderBy("Date_Of_Purchase")
    df = df.withColumn("Date_Of_Purchase", to_date(col("Date_Of_Purchase"), "dd/MM/yyyy")) \
        .withColumn("Prev_Date_Of_Purchase", lag("Date_Of_Purchase", 2).over(w)) \
        .withColumn("days_between", datediff(col("Date_Of_Purchase"), col("Prev_Date_Of_Purchase")))
    df.show()
    
    df.filter(col("days_between") <= 5).select("Cust_Id").distinct().show()
    

    EDIT:

    You can generalize this by changing the 2 in lag("Date_Of_Purchase", 2) to the number of purchases - 1, and the 5 in .filter(col("days_between") <= 5) the to days interval le window.