Assuming the existence of an RDD of tuples similar to the following:
(key1, 1)
(key3, 9)
(key2, 3)
(key1, 4)
(key1, 5)
(key3, 2)
(key2, 7)
...
What is the most efficient (and, ideally, distributed) way to compute statistics corresponding to each key? (At the moment, I am looking to calculate standard deviation / variance, in particular.) As I understand it, my options amount to:
colStats
function in MLLib: This approach has the advantage of easily-adaptable to use other mllib.stat
functions later, if other statistical computations are deemed necessary. However, it operates on an RDD of Vector
containing the data for each column, so as I understand it, this approach would require that the full set of values for each key be collected on a single node, which would seem non-ideal for large data sets. Does a Spark Vector
always imply that the data in the Vector
be resident locally, on a single node?groupByKey
, then stats
: Likely shuffle-heavy, as a result of the groupByKey
operation.aggregateByKey
, initializing a new StatCounter
, and using StatCounter::merge
as the sequence and combiner functions: This is the approach recommended by this StackOverflow answer, and avoids the groupByKey
from option 2. However, I haven't been able to find good documentation for StatCounter
in PySpark.I like Option 1 because it makes the code more extensible, in that it could easily accommodate more complicated calculations using other MLLib functions with similar contracts, but if the Vector
inputs inherently require that the data sets be collected locally, then it limits the data sizes on which the code can effectively operate. Between the other two, Option 3 looks more efficient because it avoids the groupByKey
, but I was hoping to confirm that that is the case.
Are there any other options I haven't considered? (I am currently using Python + PySpark, but I'm open to solutions in Java/Scala as well, if there is a language difference.)
You can try reduceByKey
. It's pretty straightforward if we only want to compute the min()
:
rdd.reduceByKey(lambda x,y: min(x,y)).collect()
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)]
To calculate the mean
, you'll first need to create (value, 1)
tuples which we use to calculate both the sum
and count
in the reduceByKey
operation. Lastly we divide them by each other to arrive at the mean
:
meanRDD = (rdd
.mapValues(lambda x: (x, 1))
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
.mapValues(lambda x: x[0]/x[1]))
meanRDD.collect()
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)]
For the variance
, you can use the formula (sumOfSquares/count) - (sum/count)^2
,
which we translate in the following way:
varRDD = (rdd
.mapValues(lambda x: (1, x, x*x))
.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
.mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2)))
varRDD.collect()
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)]
I used values of type double
instead of int
in the dummy data to accurately illustrate computing the average and variance:
rdd = sc.parallelize([("key1", 1.0),
("key3", 9.0),
("key2", 3.0),
("key1", 4.0),
("key1", 5.0),
("key3", 2.0),
("key2", 7.0)])