I have a dataframe which has numerical values similar to this :
df
probability
0
0.2
0.3
0.4
0.5
I need to find the mean of the probabilities for each successive entries and want the following
expectedMeanDF
mean
0
0.1
0.25
0.35
0.45
1
where 0.1 is the mean of 0 and 0.2, 0.25 for 0.2 and 0.3 and so on...
I'm using window functions in the following way to do this :
df.withColumn("partition", dp.col("probability")*0)
val window = Window.partitionBy("partition")
val mean = distinctProbability.withColumn("mean", (newdp.col("probability") + lead("probability", 1).over(window)) / 2).drop("partition").drop("probability")
So I have two problems with this approach :
Any alternate approach for this?
30 million is not a large number so a local processing could be fast enough but it is not correct. partitionBy
causes a shuffle and without orderBy
data can be put in a different order than you expect.
The only scalable solution I know is to convert to RDD:
val df = Seq(0, 0.2, 0.3, 0.4, 0.5).toDF("probability")
val rdd = df.orderBy($"probability").as[Double].rdd
import RDDFunctions
import org.apache.spark.mllib.rdd.RDDFunctions._
use sliding method:
val slides = rdd.sliding(2).toDS
find mean:
val means = slides.map(slide => slide.sum / slide.size)
and convert back to DataFrame
:
means.toDF
It will cover all consecutive values:
+-----+
|value|
+-----+
| 0.1|
| 0.25|
| 0.35|
| 0.45|
+-----+
but you'll have to add range boundaries manually.