Search code examples
pythonhadoopmapreducehadoop-yarnhadoop-streaming

Word count sorting in MapReduce Python using yarn comparator


I want to solve the word count problem and want to get the results in reverse sorted order according to the frequency of occurrence in the file.

Following are the four files (2 mappers and 2 reducers, as one Map Reduce job cannot solve this problem) I wrote for this purpose:

1) mapper1.py

import sys
import re

reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode

for line in sys.stdin:
    try:
        article_id, text = unicode(line.strip()).split('\t', 1)
    except ValueError as e:
        continue
    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    for word in words:
        print "%s\t%d" % (word.lower(), 1)

2) reducer1.py

import sys

current_key = None
word_sum = 0

for line in sys.stdin:
    try:
        key, count = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue
    if current_key != key:
        if current_key:
            print "%s\t%d" % (current_key, word_sum)
        word_sum = 0
        current_key = key
    word_sum += count

if current_key:
    print "%s\t%d" % (current_key, word_sum)

3) mapper2.py

import sys
import re

reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode


for line in sys.stdin:
    try:
        word, count = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue

    print "%s\t%d" % (word, count)

4) reducer2.py

import sys

for line in sys.stdin:
    try:
        word, count = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue

    print "%s\t%d" % (word, count)

The following are the two yarn commands run by me in a bash environment

OUT_DIR="wordcount_result_1"
NUM_REDUCERS=8

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapred.jab.name="Streaming wordCount" \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files mapper1.py,reducer1.py \
    -mapper "python mapper1.py" \
    -combiner "python reducer1.py" \
    -reducer "python reducer1.py" \
    -input /test/articles-part-short \
    -output ${OUT_DIR} > /dev/null


OUT_DIR_2="wordcount_result_2"
NUM_REDUCERS=1

hdfs dfs -rm -r -skipTrash ${OUT_DIR_2} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapred.jab.name="Streaming wordCount Rating" \
    -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
    -D map.output.key.field.separator=\t \
    -D mapreduce.partition.keycomparator.options=-k2,2nr \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files mapper2.py,reducer2.py \
    -mapper "python mapper2.py" \
    -reducer "python reducer2.py" \
    -input ${OUT_DIR} \
    -output ${OUT_DIR_2} > /dev/null

hdfs dfs -cat ${OUT_DIR_2}/part-00000 | head

This is not giving me the right answer. Can someone please explain where did it go wrong?

On the other hand,

in the mapper2.py if I print in the following manner,

print "%d\t%s" % (count, word)

and in the reducer2.py if I read in the following manner,

count, word = line.strip().split('\t', 1)

and edit the 2nd yarn command option to

-D mapreduce.partition.keycomparator.options=-k1,1nr

it gives me right answer.

Why is it behaving differently in both of the above cases?

Can someone please help me understand the Comparator options of Hadoop MapReduce?


Solution

  • This will work

    yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
        -D mapred.jab.name="Streaming wordCount rating" \
        -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
        -D mapreduce.partition.keycomparator.options='-k2nr' \
        -D stream.num.map.output.key.fields=2 \
        -D mapred.map.tasks=1 \
        -D mapreduce.job.reduces=1 \
        -files mapper2.py,reducer2.py \
        -mapper "python mapper2.py" \
        -reducer "python reducer2.py" \
        -input /user/jovyan/assignment0_1563877099149160 \
        -output ${OUT_DIR} > /dev/null