Search code examples
pythonapache-sparkpysparkrdd

Using PySpark to Count Number of Occurrences


I have a PairedRDD that contains the Document ID as key, and a list of words in that document as the value. E.g.

DocID Words
001 ["quick","brown","fox","lazy","fox"]
002 ["banana","apple","apple","banana","fox"]

I managed to do a mapValues such that:

DocID Words
001 [("quick",1),("brown",1),("fox",1),("lazy",1),("fox",1)]
002 [("banana",1),("apple",1),("apple",1),("banana",1),("fox",1)]

Is there a way to do a ReduceByKey() on just the Words?

DocID Words
001 [("quick",1),("brown",1),("fox",2),("lazy",1)]
002 [("banana",2),("apple",2),("fox",1)]

I still need to maintain the structure such that counts are only applied within each Document only.


Solution

  • You can use collections.Counter to count the number of words in each doc:

    from collections import Counter
    
    rdd = sc.parallelize([
        ("001", ["quick","brown","fox","lazy","fox"]),
        ("002", ["banana","apple","apple","banana","fox"])
    ])
    
    counted = rdd.mapValues(lambda x: list(zip(Counter(x).keys(), Counter(x).values())))
    
    counted.collect()
    # [('001', [('quick', 1), ('brown', 1), ('fox', 2), ('lazy', 1)]),
    #  ('002', [('banana', 2), ('apple', 2), ('fox', 1)])]
    

    Another RDD only way:

    from operator import add
    
    result = rdd.flatMapValues(lambda x: x) \
                .map(lambda x: (x,1)) \
                .reduceByKey(add) \
                .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
                .reduceByKey(add)
    
    result.collect()
    #[('002', [('banana', 2), ('apple', 2), ('fox', 1)]), 
    # ('001', [('brown', 1), ('fox', 2), ('lazy', 1), ('quick', 1)])]