Search code examples
javahadoopmapreducebigdata

Map Reduce for Top N Items


I am working on a Hadoop Project in Java and having some difficulties. I understand the goal of what I am supposed to be doing but truly do not understand exactly how to implement it. I am attempting to extract the top N results from a map reduce job, such as the top 5 highest frequency values.

I understand that this will generally require two map reduces, one for the reduce and one to sort the values. However, like I said, I am fairly lost on how to actually implement this.

The code I am using is a fairly standard map reduce code with some filtering for special values.

public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
      private Text wordToken = new Text();
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
      {
          StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']"); //Dividing String into tokens
        while (tokens.hasMoreTokens())
        {
          wordToken.set(tokens.nextToken());
          context.write(wordToken, new IntWritable(1));
        }
      }
    }

Reducer

public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
      private IntWritable count = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
      {
        int valueSum = 0;
        for (IntWritable val : values)
        {
          valueSum += val.get();
        }
        count.set(valueSum);
        context.write(key, count);
      }
    }

Driver

public class WordCount {
      public static void main(String[] args) throws Exception
      {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (pathArgs.length < 2)
        {
          System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
          System.exit(2);
        }
        Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
        wcJob.setJarByClass(WordCount.class);
        wcJob.setMapperClass(MapWordCount.class);
        wcJob.setCombinerClass(ReduceWordCount.class);
        wcJob.setReducerClass(ReduceWordCount.class);
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
          FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
        System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
      }
    }

If anybody could assist me with this I would appreciate it. As I said, I know I need two map reduces, but am not quite sure how to start with this. I attempted a couple of other solutions I found on StackOverflow but didn't have much luck for my case. Thanks so much!


Solution

  • You are indeed right, you do need two MapReduce jobs chained together. More specifically, you need:

    • one job to compute the wordcount for every word stored inside the input documents,

    • and one job to be able to "sort" all those words and wordcounts in order to pick and output the top N of them.

    The first job is fairly similar to what you have come up with already, so I'm going to focus on the second job, to make things a bit clearer about how TopN works in the MapReduce paradigm.

    Thinking of the TopN MR job as a standalone thing, we know that this particular job will receive a bunch of key-value pairs where every word from the last step is going to be the key and its wordcount is going to be the value. Since mappers and reducers are isolated instances of the map and reduce functions running in parallel, we need to find a way to first find the TopN words locally (i.e. for every mapper), and then group all of those local TopN results to find the "global" TopN words for all the data given to the application by the input.

    So, the TopNMapper will first have to create a TreeMap (a Java key-value data structure that internally sorts its elements by key) in the setup function (so before mapper instances will be created), which every mapper will initialize an object of it and put every word and its wordcount as elements. For this type of computation (TopN), we will put the wordcount as key and the word as value to have a ascending order sorted list of the words. Since we only need to find out the top N of the words here, it is safe to say that we only want the top N words for every mapper, so we can remove all of the other elements below and have a TreeMap of N elements, which will be given to the reducers in the end of the mappers' execution (i.e. through the cleanup function). The mappers will write key-value pairs where the words are going to be keys and their wordcounts are going to be values, like that:

    <word, wordcount>

    Now for the TopNReducer, we need to do the same exact thing using a TreeMap data structure again to populate it with all of the local TopN elements, remove the elements that are not the top N of them, and write the words and their wordcounts as output. To be more "clean" with the approach, we can "reverse" words and wordcounts in the key-value pair structure so we can have the wordcounts as key and the words as value. This results into an (ascending order) sorted amount of key-value pairs that are going to be stored in disk after this job is done like this:

    wordcount, word>

    The program that can do such a thing in 2 MR jobs looks like the following (where we set N as a global Configuration value inside the main function with the conf.set("N", "10"); command, and access it in the setup functions of the TopNMapper and TopNReducer classes), with all of the classes being put in one class TopNWordCount for simplicity:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.TreeMap;
    
    
    public class TopNWordCount
    {
        /* input:  <document, contents>
         * output: <word, 1>
         */
        public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>
        {
            private final static IntWritable one = new IntWritable(1);
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException
            {
                // clean up the document text and split the words into an array
                String[] words = value.toString()
                                    .replaceAll("\\d+", "")           // get rid of numbers...
                                    .replaceAll("[^a-zA-Z ]", " ")    // get rid of punctuation...
                                    .toLowerCase()                                      // turn every letter to lowercase...
                                    .trim()                                             // trim the spaces
                                    .replaceAll("\\s+", " ")
                                    .split(" ");
    
                // write every word as key with `1` as value that indicates that the word is
                // found at least 1 time inside the input text
                for(String word : words)
                    context.write(new Text(word), one);
            }
        }
        
        /* input: <word, 1>
         * output: <word, wordcount>
         */
        public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
        {
            private IntWritable wordcount = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
            {
                int word_cnt = 0;
    
                for(IntWritable value : values)
                    word_cnt += value.get();
    
                wordcount.set(word_cnt);
    
                context.write(key, wordcount);
            }
        }
    
    
    
        /* input:  <word, wordcount>
         * output: <NULL, (word, wordcount)> (with the local topN words)
         */
        public static class TopNMapper extends Mapper<Object, Text, Text, IntWritable>
        {
            private int n;  // the N of TopN
            private TreeMap<Integer, String> word_list; // local list with words sorted by their frequency
    
            public void setup(Context context)
            {
                n = Integer.parseInt(context.getConfiguration().get("N"));  // get N
                word_list = new TreeMap<Integer, String>();
            }
    
            public void map(Object key, Text value, Context context)
            {
                String[] line = value.toString().split("\t");   // split the word and the wordcount
    
                // put the wordcount as key and the word as value in the word list
                // so the words can be sorted by their wordcounts
                word_list.put(Integer.valueOf(line[1]), line[0]);
    
                // if the local word list is populated with more than N elements
                // remove the first (aka remove the word with the smallest wordcount)
                if (word_list.size() > n)
                    word_list.remove(word_list.firstKey());
            }
    
            public void cleanup(Context context) throws IOException, InterruptedException
            {
                // write the topN local words before continuing to TopNReducer
                // with each word as key and its wordcount as value
                for (Map.Entry<Integer, String> entry : word_list.entrySet())
                {
                    context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
                }
            }
        }
    
        /* input:  <word, wordcount> (with the local topN words)
         * output: <wordcount, word> (with the global topN words)
         */
        public static class TopNReducer extends Reducer<Text, IntWritable, IntWritable, Text>
        {
            private int n;  // the N of TopN
            private TreeMap<Integer, String> word_list; //  list with words globally sorted by their frequency
    
            public void setup(Context context)
            {
                n = Integer.parseInt(context.getConfiguration().get("N"));  // get N
                word_list = new TreeMap<Integer, String>();
            }
    
            public void reduce(Text key, Iterable<IntWritable> values, Context context)
            {
                int wordcount = 0;
    
                // get the one and only value (aka the wordcount) for each word
                for(IntWritable value : values)
                    wordcount = value.get();
    
                // put the wordcount as key and the word as value in the word list
                // so the words can be sorted by their wordcounts
                word_list.put(wordcount, key.toString());
    
                // if the global word list is populated with more than N elements
                // remove the first (aka remove the word with the smallest wordcount)
                if (word_list.size() > n)
                    word_list.remove(word_list.firstKey());
            }
    
            public void cleanup(Context context) throws IOException, InterruptedException
            {
                // write the topN global words with each word as key and its wordcount as value
                // so the output will be sorted by the wordcount
                for (Map.Entry<Integer, String> entry : word_list.entrySet())
                {
                    context.write(new IntWritable(entry.getKey()), new Text(entry.getValue()));
                }
            }
        }
    
    
        public static void main(String[] args) throws Exception
        {
            Configuration conf = new Configuration();
            String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
            conf.set("N", "10"); // set the N as a "public" value in the current Configuration
    
            if (pathArgs.length < 2)
            {
              System.err.println("MR Project Usage: TopNWordCount <input-path> [...] <output-path>");
              System.exit(2);
            }
    
            Path wordcount_dir = new Path("wordcount");
            Path output_dir = new Path(pathArgs[pathArgs.length - 1]);
    
            // if the in-between and output directories exists, delete them
            FileSystem fs = FileSystem.get(conf);
            if(fs.exists(wordcount_dir))
                fs.delete(wordcount_dir, true);
            if(fs.exists(output_dir))
                fs.delete(output_dir, true);
    
            Job wc_job = Job.getInstance(conf, "WordCount");
            wc_job.setJarByClass(TopNWordCount.class);
            wc_job.setMapperClass(WordCountMapper.class);
            wc_job.setReducerClass(WordCountReducer.class);
            wc_job.setMapOutputKeyClass(Text.class);
            wc_job.setMapOutputValueClass(IntWritable.class);
            wc_job.setOutputKeyClass(Text.class);
            wc_job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < pathArgs.length - 1; ++i)
            {
              FileInputFormat.addInputPath(wc_job, new Path(pathArgs[i]));
            }
            FileOutputFormat.setOutputPath(wc_job, wordcount_dir);
            wc_job.waitForCompletion(true);
            
            Job topn_job = Job.getInstance(conf, "TopN");
            topn_job.setJarByClass(TopNWordCount.class);
            topn_job.setMapperClass(TopNMapper.class);
            topn_job.setReducerClass(TopNReducer.class);
            topn_job.setMapOutputKeyClass(Text.class);
            topn_job.setMapOutputValueClass(IntWritable.class);
            topn_job.setOutputKeyClass(IntWritable.class);
            topn_job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(topn_job, wordcount_dir);
            FileOutputFormat.setOutputPath(topn_job, output_dir);
            topn_job.waitForCompletion(true);
        }
    }
    

    The output of this program (using this directory with text files as input) is the following:

    enter image description here

    Notice that the top 10 of the words here are stopwords (like the, to, etc.), as we should expect. If you want to filter out those stopwords, you can of course use TF-IDF and implement it in Hadoop with a lot of ways like this one for example.