Search code examples
scalaapache-sparktime-serieswindow-functions

Time series with scala and spark. Rolling window


I'm trying to work on the following exercise using Scala and spark.

Given a file containing two columns: a time in seconds and a value

Example:

|---------------------|------------------|
|     seconds         |     value        |
|---------------------|------------------|
|          225        |         1,5      |
|          245        |         0,5      |
|          300        |         2,4      |
|          319        |         1,2      |
|          320        |         4,6      |
|---------------------|------------------|

and given a value V to be used for the rolling window this output should be created:

Example with V=20

|--------------|---------|--------------------|----------------------|
|     seconds  |  value  |  num_row_in_window |sum_values_in_windows |
|--------------|---------|--------------------|----------------------|
|       225    |    1,5  |          1         |          1,5         |
|       245    |    0,5  |          2         |           2          |
|       300    |    2,4  |          1         |          2,4         |
|       319    |    1,2  |          2         |          3,6         |
|       320    |    4,6  |          3         |          8,2         |
|--------------|---------|--------------------|----------------------|

num_row_in_window is the number of rows contained in the current window and sum_values_in_windows is the sum of the values contained in the current window.

I've been trying with the sliding function or using the sql api but it's a bit unclear to me which is the best solution to tackle this problem considering that I'm a spark/scala novice.


Solution

  • This is a perfect application for window-functions. By using rangeBetween you can set your sliding window to 20s. Note that in the example below no partitioning is specified (no partitionBy). Without a partitioning, this code will not scale:

    import ss.implicits._
    
    val df = Seq(
      (225, 1.5),
      (245, 0.5),
      (300, 2.4),
      (319, 1.2),
      (320, 4.6)
    ).toDF("seconds", "value")
    
    val window = Window.orderBy($"seconds").rangeBetween(-20L, 0L) // add partitioning here
    
    df
      .withColumn("num_row_in_window", sum(lit(1)).over(window))
      .withColumn("sum_values_in_window", sum($"value").over(window))
      .show()
    
    +-------+-----+-----------------+--------------------+
    |seconds|value|num_row_in_window|sum_values_in_window|
    +-------+-----+-----------------+--------------------+
    |    225|  1.5|                1|                 1.5|
    |    245|  0.5|                2|                 2.0|
    |    300|  2.4|                1|                 2.4|
    |    319|  1.2|                2|                 3.6| 
    |    320|  4.6|                3|                 8.2|
    +-------+-----+-----------------+--------------------+