I'm trying to work on the following exercise using Scala and spark.
Given a file containing two columns: a time in seconds and a value
Example:
|---------------------|------------------|
| seconds | value |
|---------------------|------------------|
| 225 | 1,5 |
| 245 | 0,5 |
| 300 | 2,4 |
| 319 | 1,2 |
| 320 | 4,6 |
|---------------------|------------------|
and given a value V
to be used for the rolling window this output should be created:
Example with V=20
|--------------|---------|--------------------|----------------------|
| seconds | value | num_row_in_window |sum_values_in_windows |
|--------------|---------|--------------------|----------------------|
| 225 | 1,5 | 1 | 1,5 |
| 245 | 0,5 | 2 | 2 |
| 300 | 2,4 | 1 | 2,4 |
| 319 | 1,2 | 2 | 3,6 |
| 320 | 4,6 | 3 | 8,2 |
|--------------|---------|--------------------|----------------------|
num_row_in_window
is the number of rows contained in the current window and
sum_values_in_windows
is the sum of the values contained in the current window.
I've been trying with the sliding function or using the sql api but it's a bit unclear to me which is the best solution to tackle this problem considering that I'm a spark/scala novice.
This is a perfect application for window-functions. By using rangeBetween
you can set your sliding window to 20s. Note that in the example below no partitioning is specified (no partitionBy
). Without a partitioning, this code will not scale:
import ss.implicits._
val df = Seq(
(225, 1.5),
(245, 0.5),
(300, 2.4),
(319, 1.2),
(320, 4.6)
).toDF("seconds", "value")
val window = Window.orderBy($"seconds").rangeBetween(-20L, 0L) // add partitioning here
df
.withColumn("num_row_in_window", sum(lit(1)).over(window))
.withColumn("sum_values_in_window", sum($"value").over(window))
.show()
+-------+-----+-----------------+--------------------+
|seconds|value|num_row_in_window|sum_values_in_window|
+-------+-----+-----------------+--------------------+
| 225| 1.5| 1| 1.5|
| 245| 0.5| 2| 2.0|
| 300| 2.4| 1| 2.4|
| 319| 1.2| 2| 3.6|
| 320| 4.6| 3| 8.2|
+-------+-----+-----------------+--------------------+