Search code examples
pythonapache-sparkpysparkrdd

How does pyspark RDD countByKey() count?


Before posting this question I searched the community and referred pyspark docs, but I am still not able to understand how its counting.

sc.parallelize((('1',11),('1'),('11'),('11',1))).countByKey().items() 

output:

dict_items([('1', 3), ('11', 1)])

I am not able to interpret the output. Why is it counting '1' as 3 and '11' as 1?


Solution

  • When you call countByKey(), the key will be be the first element of the container passed in (usually a tuple) and the value will be the rest.

    You can think of the execution to be roughly functionally equivalent to:

    from operator import add
    
    def myCountByKey(rdd):
        return rdd.map(lambda row: (row[0], 1)).reduceByKey(add)
    

    The function maps each row in your rdd to the first element of the row (the key) and the number 1 as the value. Finally we reduce adding the values together for each key, to get the count.

    Let's try this on your example:

    rdd = sc.parallelize((('1',11),('1'),('11'),('11',1)))
    myCountByKey(rdd).collect()
    #[('1', 3), ('11', 1)]
    

    The "extra" '1' is coming from the third element ('11'). Mapping this row to (row[0], 1) yields ('1', 1). In this case, row[0] is the first character in the string.

    You may expected this to behave as if the third element were the tuple ('11',).

    rdd = sc.parallelize((('1',11),('1',),('11',),('11',1)))
    rdd.countByKey().items()
    #[('1', 2), ('11', 2)]
    

    The takeaway is that you must include the comma if you want to specify a key with no value.