Search code examples
pythonpysparkgroupingrddtop-n

Using pyspark RDD .groupByKey extract highest value element per group


TOKEN_RE = re.compile(r"\b[\w']+\b")
def pos_tag_counter(line):
    toks = nltk.regexp_tokenize(line.lower(), TOKEN_RE)
    postoks = nltk.tag.pos_tag(toks)
    return postoks

pos_tag_counts = text.filter(lambda line: len(line) > 0) \
    .filter(lambda line: re.findall('^(?!URL).*', line)) \
    .flatMap(pos_tag_counter) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .map(lambda x: (x[0][1], (x[1], x[0][0]))) \
    .groupByKey().map(lambda x : (x[0], list(x[1])))

I have a text file that was reduced to lines, than words, words were counted and tagged with a POS(part of speech) label. So what I have now is a series of tuples (pos, (word, count)). POS being the key. I need to find the most frequent word for each POS.

[('NN', (1884, 'washington')),
('NN', (5, 'stellar')),
('VBD', (563, 'kept')),
('DT', (435969, 'the')),
('JJ', (9300, 'first')),
('NN', (1256, 'half')),
('NN', (4028, 'season')),

This is my first pyspark project, so I don't think I am quite grasping the concept. I used group

[('VBD',
[(563, 'kept'),
(56715, 'said'),
(2640, 'got'),
(12370, 's'),
(55523, 'was'),
(62, 'snapped'),

Ideally the output would be - (POS, count, word) in any order as long as the tuple shows the highest count word per POS:

('NN', 1884, 'washington')
('DT', 435969, 'the')
etc.


Solution

  • Basic idea is groupByKey, then find the max value for each group. Since you need the longest word, you can define the key to the max method as length of the word.

    rdd = sc.parallelize([('NN', (1884, 'washington')),
        ('NN', (5, 'stellar')),
        ('VBD', (563, 'kept')),
        ('DT', (435969, 'the')),
        ('JJ', (9300, 'first')),
        ('NN', (1256, 'half')),
        ('NN', (4028, 'season'))])
    
    pos_count = rdd.groupByKey()
                   .mapValues(lambda v: max(v, key=lambda x: len(x[1])))
    
    print(pos_count.collect())
    # [('DT', (435969, 'the')), ('VBD', (563, 'kept')), ('NN', (1884, 'washington')), ('JJ', (9300, 'first'))]