Search code examples
scalaapache-sparkapache-spark-sqlwindow-functions

Spark SQL window function look ahead and complex function


I have the following data:

+-----+----+-----+
|event|t   |type |
+-----+----+-----+
| A   |20  | 1   |
| A   |40  | 1   |
| B   |10  | 1   |
| B   |20  | 1   |
| B   |120 | 1   |
| B   |140 | 1   |
| B   |320 | 1   |
| B   |340 | 1   |
| B   |360 | 7   |
| B   |380 | 1   |
+-----+-----+----+

And what I want is something like this:

+-----+----+----+
|event|t   |grp |
+-----+----+----+
| A   |20  |1   |
| A   |40  |1   |
| B   |10  |2   |
| B   |20  |2   |
| B   |120 |3   |
| B   |140 |3   |
| B   |320 |4   |
| B   |340 |4   |
| B   |380 |5   |
+-----+----+----+

Rules:

  1. Group all Values together that are at least 50ms away from each other. (column t) and belongs to the same event.
  2. When a row of type 7 appears take a cut too and remove this row. (see last row)

The first rule I can achieve with the answer from this thread:

Code:

val windowSpec= Window.partitionBy("event").orderBy("t")

 val newSession =  (coalesce(
  ($"t" - lag($"t", 1).over(windowSpec)),
  lit(0)
) > 50).cast("bigint")

val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

I have to say I can't figure it out how it works and don't know how to modify it so that rule 2 also works... Hope someone can give me some useful hints.

What I tried:

val newSession =  (coalesce(
  ($"t" - lag($"t", 1).over(windowSpec)),
  lit(0)
) > 50 || lead($"type",1).over(windowSpec) =!= 7 ).cast("bigint")

But only an error occurred: "Must follow method; cannot follow org.apache.spark.sql.Column val grp = (coalesce(


Solution

  • this should do the trick:

    val newSession =  (coalesce(
      ($"t" - lag($"t", 1).over(win)),
      lit(0)
    ) > 50 
      or $"type"===7) // also start new group in this case
     .cast("bigint")
    
    df.withColumn("session", sum(newSession).over(win))
    .where($"type"=!=7) // remove these rows
    .orderBy($"event",$"t")
    .show
    

    gives:

    +-----+---+----+-------+
    |event|  t|type|session|
    +-----+---+----+-------+
    |    A| 20|   1|      0|
    |    A| 40|   1|      0|
    |    B| 10|   1|      0|
    |    B| 20|   1|      0|
    |    B|120|   1|      1|
    |    B|140|   1|      1|
    |    B|320|   1|      2|
    |    B|340|   1|      2|
    |    B|380|   1|      3|
    +-----+---+----+-------+