Search code examples
mysqlsqlapache-spark-sqldense-rank

select with window function (dense_rank()) in SparkSQL


I have a table which contains records for customer purchases, I need to specify that purchase was made in specific datetime window one window is 8 days , so if I had purchase today and one in 5 days its mean my purchase if window number 1, but if I did it on day one today and next in 8 days, first purchase will be in window 1 and the last purchase in window 2

create temporary table transactions
 (client_id int,
 transaction_ts datetime,
 store_id int)

 insert into transactions values 
 (1,'2018-06-01 12:17:37', 1),
 (1,'2018-06-02 13:17:37', 2),
 (1,'2018-06-03 14:17:37', 3),
 (1,'2018-06-09 10:17:37', 2),
 (2,'2018-06-02 10:17:37', 1),
 (2,'2018-06-02 13:17:37', 2),
 (2,'2018-06-08 14:19:37', 3),
 (2,'2018-06-16 13:17:37', 2),
 (2,'2018-06-17 14:17:37', 3)

the window is 8 days, the problem is I don't understand how to specify for dense_rank() OVER (PARTITION BY) to look at datetime and make a window in 8 days, as result I need something like this

1,'2018-06-01 12:17:37', 1,1
1,'2018-06-02 13:17:37', 2,1
1,'2018-06-03 14:17:37', 3,1
1,'2018-06-09 10:17:37', 2,2
2,'2018-06-02 10:17:37', 1,1
2,'2018-06-02 13:17:37', 2,1
2,'2018-06-08 14:19:37', 3,2
2,'2018-06-16 13:17:37', 2,3
2,'2018-06-17 14:17:37', 3,3

any idea how to get it? I can run it in Mysql or Spark SQL, but Mysql doesn't support partition. Still cannot find solution! any help


Solution

  • Most likely you may solve this in Spark SQL using time and partition window functions:

    val purchases = Seq((1,"2018-06-01 12:17:37", 1), (1,"2018-06-02 13:17:37", 2), (1,"2018-06-03 14:17:37", 3), (1,"2018-06-09 10:17:37", 2), (2,"2018-06-02 10:17:37", 1), (2,"2018-06-02 13:17:37", 2), (2,"2018-06-08 14:19:37", 3), (2,"2018-06-16 13:17:37", 2), (2,"2018-06-17 14:17:37", 3)).toDF("client_id", "transaction_ts", "store_id")
    
    purchases.show(false)
    +---------+-------------------+--------+
    |client_id|transaction_ts     |store_id|
    +---------+-------------------+--------+
    |1        |2018-06-01 12:17:37|1       |
    |1        |2018-06-02 13:17:37|2       |
    |1        |2018-06-03 14:17:37|3       |
    |1        |2018-06-09 10:17:37|2       |
    |2        |2018-06-02 10:17:37|1       |
    |2        |2018-06-02 13:17:37|2       |
    |2        |2018-06-08 14:19:37|3       |
    |2        |2018-06-16 13:17:37|2       |
    |2        |2018-06-17 14:17:37|3       |
    +---------+-------------------+--------+
    
    
    
    val groupedByTimeWindow = purchases.groupBy($"client_id", window($"transaction_ts", "8 days")).agg(collect_list("transaction_ts").as("transaction_tss"), collect_list("store_id").as("store_ids"))
    
    val withWindowNumber = groupedByTimeWindow.withColumn("window_number", row_number().over(windowByClient))
    
    withWindowNumber.orderBy("client_id", "window.start").show(false)
    
        +---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
    |client_id|window                                       |transaction_tss                                                |store_ids|window_number|
    +---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
    |1        |[2018-05-28 17:00:00.0,2018-06-05 17:00:00.0]|[2018-06-01 12:17:37, 2018-06-02 13:17:37, 2018-06-03 14:17:37]|[1, 2, 3]|1            |
    |1        |[2018-06-05 17:00:00.0,2018-06-13 17:00:00.0]|[2018-06-09 10:17:37]                                          |[2]      |2            |
    |2        |[2018-05-28 17:00:00.0,2018-06-05 17:00:00.0]|[2018-06-02 10:17:37, 2018-06-02 13:17:37]                     |[1, 2]   |1            |
    |2        |[2018-06-05 17:00:00.0,2018-06-13 17:00:00.0]|[2018-06-08 14:19:37]                                          |[3]      |2            |
    |2        |[2018-06-13 17:00:00.0,2018-06-21 17:00:00.0]|[2018-06-16 13:17:37, 2018-06-17 14:17:37]                     |[2, 3]   |3            |
    +---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
    

    If you need, you may explode list elements from store_ids or transaction_tss.

    hope it helps!