Search code examples
apache-sparkdataframeapache-spark-sqlwindow-functions

How to find mean of successive entries in Spark


I have a dataframe which has numerical values similar to this :

df

probability
0
0.2
0.3
0.4
0.5

I need to find the mean of the probabilities for each successive entries and want the following

expectedMeanDF

mean
0
0.1
0.25
0.35
0.45
1

where 0.1 is the mean of 0 and 0.2, 0.25 for 0.2 and 0.3 and so on...

I'm using window functions in the following way to do this :

df.withColumn("partition", dp.col("probability")*0)

val window = Window.partitionBy("partition")
val mean = distinctProbability.withColumn("mean", (newdp.col("probability") + lead("probability", 1).over(window)) / 2).drop("partition").drop("probability")

So I have two problems with this approach :

  1. It's not able to append 0 and 1 at first and last positions of the dataframe respectively
  2. It's not very efficient. The number of rows in my df might go upto 30 million so that is a challenge.

Any alternate approach for this?


Solution

  • 30 million is not a large number so a local processing could be fast enough but it is not correct. partitionBy causes a shuffle and without orderBy data can be put in a different order than you expect.

    The only scalable solution I know is to convert to RDD:

    val df = Seq(0, 0.2, 0.3, 0.4, 0.5).toDF("probability")
    val rdd = df.orderBy($"probability").as[Double].rdd
    

    import RDDFunctions

    import org.apache.spark.mllib.rdd.RDDFunctions._
    

    use sliding method:

    val slides = rdd.sliding(2).toDS
    

    find mean:

    val means = slides.map(slide => slide.sum / slide.size)
    

    and convert back to DataFrame:

    means.toDF
    

    It will cover all consecutive values:

    +-----+
    |value|
    +-----+
    |  0.1|
    | 0.25|
    | 0.35|
    | 0.45|
    +-----+
    

    but you'll have to add range boundaries manually.