Search code examples
pysparkrdd

AggregateByKey in Pyspark not giving expected output


I have an RDD which has 2 partition and key value pair data as value:

rdd5.glom().collect()

[[(u'hive', 1), (u'python', 1), (u'spark', 1), (u'hive', 1), (u'spark', 1), (u'python', 1)], [(u'spark', 1), (u'java', 1), (u'java', 1), (u'spark', 1)]]

When I perform aggregateByKey

rdd6=rdd5.aggregateByKey((0,0), lambda acc,val: (acc[0]+1,acc[1]+val), lambda acc1,acc2 : (acc1[1]+acc2[1])/acc1[0]+acc2[0])

It is not giving me expected result:

Output:

[(u'python', (2, 2)), (u'spark', 1), (u'java', (2, 2)), (u'hive', (2, 2))]

Expected:

[(u'python', 1), (u'spark', 1), (u'java', 1), (u'hive', 1)]

I can see key present in one partition only not giving me expected output. What changes should I make to achieve that?


Solution

  • Ok so below is the way to do this using reduceByKey and aggregateByKey.

    The problem you had with aggregateByKey is that the last function is responsiable for adding two accumulators. It has to return the same structure as all other functions so that when adding another new accumulator (From another partition) it will work again.

    It is very similar to combineByKey, see here.

    rdd = sc.parallelize([(u'hive', 1), (u'python', 1), (u'spark', 1),\
        (u'hive', 1), (u'spark', 1), (u'python', 1), (u'spark', 1), (u'java', 1), (u'java', 1), (u'spark', 1)])
    
    print rdd.aggregateByKey( (0, 0), lambda acc, val: (acc[0] + 1,acc[1] + val),\
                             lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1])).collect()
    
    print rdd.mapValues(lambda x: (1, x)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
    

    [(u'spark', (4, 4)), (u'java', (2, 2)), (u'hive', (2, 2)), (u'python', (2, 2))]

    [(u'spark', (4, 4)), (u'java', (2, 2)), (u'hive', (2, 2)), (u'python', (2, 2))]

    If you are trying to average the values, you can add another mapValues at the end like so:

    print rdd.aggregateByKey( (0, 0),\
                             lambda acc, val: (acc[0] + 1,acc[1] + val),\
                             lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1]))\
                            .mapValues(lambda x: x[1] * 1.0 / x[0])\
                            .collect()
    

    [(u'spark', 1.0), (u'java', 1.0), (u'hive', 1.0), (u'python', 1.0)]