I am working on a Hadoop Project in Java and having some difficulties. I understand the goal of what I am supposed to be doing but truly do not understand exactly how to implement it. I am attempting to extract the top N results from a map reduce job, such as the top 5 highest frequency values.
I understand that this will generally require two map reduces, one for the reduce and one to sort the values. However, like I said, I am fairly lost on how to actually implement this.
The code I am using is a fairly standard map reduce code with some filtering for special values.
public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']"); //Dividing String into tokens
while (tokens.hasMoreTokens())
{
wordToken.set(tokens.nextToken());
context.write(wordToken, new IntWritable(1));
}
}
}
Reducer
public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
{
valueSum += val.get();
}
count.set(valueSum);
context.write(key, count);
}
}
Driver
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(WordCount.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
If anybody could assist me with this I would appreciate it. As I said, I know I need two map reduces, but am not quite sure how to start with this. I attempted a couple of other solutions I found on StackOverflow but didn't have much luck for my case. Thanks so much!
You are indeed right, you do need two MapReduce jobs chained together. More specifically, you need:
one job to compute the wordcount for every word stored inside the input documents,
and one job to be able to "sort" all those words and wordcounts in order to pick and output the top N
of them.
The first job is fairly similar to what you have come up with already, so I'm going to focus on the second job, to make things a bit clearer about how TopN works in the MapReduce paradigm.
Thinking of the TopN MR job as a standalone thing, we know that this particular job will receive a bunch of key-value pairs where every word from the last step is going to be the key and its wordcount is going to be the value. Since mappers and reducers are isolated instances of the map
and reduce
functions running in parallel, we need to find a way to first find the TopN words locally (i.e. for every mapper), and then group all of those local TopN results to find the "global" TopN words for all the data given to the application by the input.
So, the TopNMapper
will first have to create a TreeMap
(a Java key-value data structure that internally sorts its elements by key) in the setup
function (so before mapper instances will be created), which every mapper will initialize an object of it and put every word and its wordcount as elements. For this type of computation (TopN), we will put the wordcount as key and the word as value to have a ascending order sorted list of the words. Since we only need to find out the top N
of the words here, it is safe to say that we only want the top N
words for every mapper, so we can remove all of the other elements below and have a TreeMap
of N
elements, which will be given to the reducers in the end of the mappers' execution (i.e. through the cleanup
function). The mappers will write key-value pairs where the words are going to be keys and their wordcounts are going to be values, like that:
<word, wordcount>
Now for the TopNReducer
, we need to do the same exact thing using a TreeMap
data structure again to populate it with all of the local TopN elements, remove the elements that are not the top N
of them, and write the words and their wordcounts as output. To be more "clean" with the approach, we can "reverse" words and wordcounts in the key-value pair structure so we can have the wordcounts as key and the words as value. This results into an (ascending order) sorted amount of key-value pairs that are going to be stored in disk after this job is done like this:
wordcount, word>
The program that can do such a thing in 2 MR jobs looks like the following (where we set N
as a global Configuration
value inside the main
function with the conf.set("N", "10");
command, and access it in the setup
functions of the TopNMapper
and TopNReducer
classes), with all of the classes being put in one class TopNWordCount
for simplicity:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
public class TopNWordCount
{
/* input: <document, contents>
* output: <word, 1>
*/
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
// clean up the document text and split the words into an array
String[] words = value.toString()
.replaceAll("\\d+", "") // get rid of numbers...
.replaceAll("[^a-zA-Z ]", " ") // get rid of punctuation...
.toLowerCase() // turn every letter to lowercase...
.trim() // trim the spaces
.replaceAll("\\s+", " ")
.split(" ");
// write every word as key with `1` as value that indicates that the word is
// found at least 1 time inside the input text
for(String word : words)
context.write(new Text(word), one);
}
}
/* input: <word, 1>
* output: <word, wordcount>
*/
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable wordcount = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int word_cnt = 0;
for(IntWritable value : values)
word_cnt += value.get();
wordcount.set(word_cnt);
context.write(key, wordcount);
}
}
/* input: <word, wordcount>
* output: <NULL, (word, wordcount)> (with the local topN words)
*/
public static class TopNMapper extends Mapper<Object, Text, Text, IntWritable>
{
private int n; // the N of TopN
private TreeMap<Integer, String> word_list; // local list with words sorted by their frequency
public void setup(Context context)
{
n = Integer.parseInt(context.getConfiguration().get("N")); // get N
word_list = new TreeMap<Integer, String>();
}
public void map(Object key, Text value, Context context)
{
String[] line = value.toString().split("\t"); // split the word and the wordcount
// put the wordcount as key and the word as value in the word list
// so the words can be sorted by their wordcounts
word_list.put(Integer.valueOf(line[1]), line[0]);
// if the local word list is populated with more than N elements
// remove the first (aka remove the word with the smallest wordcount)
if (word_list.size() > n)
word_list.remove(word_list.firstKey());
}
public void cleanup(Context context) throws IOException, InterruptedException
{
// write the topN local words before continuing to TopNReducer
// with each word as key and its wordcount as value
for (Map.Entry<Integer, String> entry : word_list.entrySet())
{
context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
}
}
}
/* input: <word, wordcount> (with the local topN words)
* output: <wordcount, word> (with the global topN words)
*/
public static class TopNReducer extends Reducer<Text, IntWritable, IntWritable, Text>
{
private int n; // the N of TopN
private TreeMap<Integer, String> word_list; // list with words globally sorted by their frequency
public void setup(Context context)
{
n = Integer.parseInt(context.getConfiguration().get("N")); // get N
word_list = new TreeMap<Integer, String>();
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
{
int wordcount = 0;
// get the one and only value (aka the wordcount) for each word
for(IntWritable value : values)
wordcount = value.get();
// put the wordcount as key and the word as value in the word list
// so the words can be sorted by their wordcounts
word_list.put(wordcount, key.toString());
// if the global word list is populated with more than N elements
// remove the first (aka remove the word with the smallest wordcount)
if (word_list.size() > n)
word_list.remove(word_list.firstKey());
}
public void cleanup(Context context) throws IOException, InterruptedException
{
// write the topN global words with each word as key and its wordcount as value
// so the output will be sorted by the wordcount
for (Map.Entry<Integer, String> entry : word_list.entrySet())
{
context.write(new IntWritable(entry.getKey()), new Text(entry.getValue()));
}
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.set("N", "10"); // set the N as a "public" value in the current Configuration
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: TopNWordCount <input-path> [...] <output-path>");
System.exit(2);
}
Path wordcount_dir = new Path("wordcount");
Path output_dir = new Path(pathArgs[pathArgs.length - 1]);
// if the in-between and output directories exists, delete them
FileSystem fs = FileSystem.get(conf);
if(fs.exists(wordcount_dir))
fs.delete(wordcount_dir, true);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
Job wc_job = Job.getInstance(conf, "WordCount");
wc_job.setJarByClass(TopNWordCount.class);
wc_job.setMapperClass(WordCountMapper.class);
wc_job.setReducerClass(WordCountReducer.class);
wc_job.setMapOutputKeyClass(Text.class);
wc_job.setMapOutputValueClass(IntWritable.class);
wc_job.setOutputKeyClass(Text.class);
wc_job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wc_job, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wc_job, wordcount_dir);
wc_job.waitForCompletion(true);
Job topn_job = Job.getInstance(conf, "TopN");
topn_job.setJarByClass(TopNWordCount.class);
topn_job.setMapperClass(TopNMapper.class);
topn_job.setReducerClass(TopNReducer.class);
topn_job.setMapOutputKeyClass(Text.class);
topn_job.setMapOutputValueClass(IntWritable.class);
topn_job.setOutputKeyClass(IntWritable.class);
topn_job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(topn_job, wordcount_dir);
FileOutputFormat.setOutputPath(topn_job, output_dir);
topn_job.waitForCompletion(true);
}
}
The output of this program (using this directory with text files as input) is the following:
Notice that the top 10 of the words here are stopwords (like the
, to
, etc.), as we should expect. If you want to filter out those stopwords, you can of course use TF-IDF and implement it in Hadoop with a lot of ways like this one for example.