Search code examples
scalaapache-sparkudfapache-spark-datasetiterated-function

Spark Iterated Function CUSUM


I'm still fairly new to Spark and I'm struggling to implement an iterated function. I'm hoping someone can help me out?

In particular, I'm trying to implement the CUSUM control statistic:

$ S_i = \max (0, S_{i-1} + x_i - Target - w $ with $ S_0 = 0 $ and $ w, Target $ being fixed parameters.

The challenge is that the CUSUM statistic is defined as an iterated function which requires ordered data and the previous function value.

The following data frame shows the desired output for $ Target = 1 $ and $ w = 0.1 $ :

i    x    S
--------------
1    1.3  0.2
2    1.8  0.9
3    0.5  0.3
4    0.6  0
5    1.2  0.1
6    1.8  0.8

On a different note: I guess it's not possible to run CUSUM in a distributed fashion? My data set is fairly large but contains multiple groups. I hope this means I can still achieve some concurrency. I guess I have to re-partition my data to have one single partition per group to run the CUSUM algorithm per group concurrently?

I hope this makes sense and any pointers are highly appreciated! Ideally I am looking for a solution in Scala and Spark 2.1

Thanks a lot!


Solution

  • After a lot of Google research I found a solution to the problem using mapPartitions

    val dataset = Seq(1.3, 1.8, 0.5, 0.6, 1.2, 1.8).toDS
    
    dataset.repartition(1).mapPartitions(iterator => {
        var s = 0.0
        val target = 1.0
        val w = 0.1
        iterator.map(x => {
            s = Math.max(0.0, s + x -target - w)
            Math.round(10.0 *s)/10.0
        })
    }).show()
    
    +-----+
    |value|
    +-----+
    |  0.2|
    |  0.9|
    |  0.3|
    |  0.0|
    |  0.1|
    |  0.8|
    +-----+
    

    I hope this will save someone some time in the future.