I have a PairedRDD that contains the Document ID as key, and a list of words in that document as the value. E.g.
DocID | Words |
---|---|
001 | ["quick","brown","fox","lazy","fox"] |
002 | ["banana","apple","apple","banana","fox"] |
I managed to do a mapValues such that:
DocID | Words |
---|---|
001 | [("quick",1),("brown",1),("fox",1),("lazy",1),("fox",1)] |
002 | [("banana",1),("apple",1),("apple",1),("banana",1),("fox",1)] |
Is there a way to do a ReduceByKey() on just the Words?
DocID | Words |
---|---|
001 | [("quick",1),("brown",1),("fox",2),("lazy",1)] |
002 | [("banana",2),("apple",2),("fox",1)] |
I still need to maintain the structure such that counts are only applied within each Document only.
You can use collections.Counter
to count the number of words in each doc:
from collections import Counter
rdd = sc.parallelize([
("001", ["quick","brown","fox","lazy","fox"]),
("002", ["banana","apple","apple","banana","fox"])
])
counted = rdd.mapValues(lambda x: list(zip(Counter(x).keys(), Counter(x).values())))
counted.collect()
# [('001', [('quick', 1), ('brown', 1), ('fox', 2), ('lazy', 1)]),
# ('002', [('banana', 2), ('apple', 2), ('fox', 1)])]
Another RDD only way:
from operator import add
result = rdd.flatMapValues(lambda x: x) \
.map(lambda x: (x,1)) \
.reduceByKey(add) \
.map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
.reduceByKey(add)
result.collect()
#[('002', [('banana', 2), ('apple', 2), ('fox', 1)]),
# ('001', [('brown', 1), ('fox', 2), ('lazy', 1), ('quick', 1)])]