Search code examples
javahadoopmapreducehadoop2

ascending sort based on values of the reducer


I am new to hadoop mapreduce programming paradigm, can someone tell me how can I sort based on values easily? I tried implementing another comparator class, but is there a simpler way like through job config to sort based on values of the reducer. Basically i am reading log files and i want to order url to hitcount in ascending order.

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
    String[] split = value.toString().split(" ");
    for(int i=0; i<split.length; i++){
        if (i==6)
            word.set(split[i]);
            context.write(word, ONE);
    }
}
}

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {       
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,  Context context) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);      
}
}

Solution

  • Declare one map inside your reducer class and put the key and values in map. Now in the cleanup() method of your reducer class try to sort the map by values and then finally give the values in context.write(key,value);

    public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {       
    private IntWritable result = new IntWritable();
    
    TreeMap<Text,IntWritable>result=new TreeMap<Text, IntWritable>();
    
    public void reduce(Text key, Iterable<IntWritable> values,  Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
     result.put(new Text(key),new IntWritable(sum));
    }
    }
    
        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {
    
            Set<Entry<Text, IntWritable>> set = result.entrySet();
            List<Entry<Text, IntWritable>> list = new ArrayList<Entry<Text,IntWritable>>(set);
            Collections.sort( list, new Comparator<Map.Entry<Text, IntWritable>>()
            {
                public int compare( Map.Entry<Text, IntWritable> o1, Map.Entry<Text,IntWritable> o2 )
                {
                    return (o2.getValue()).compareTo( o1.getValue() );
                }
            });
            for(Map.Entry<Text,IntWritable> entry:list){
    
                context.write(entry.getKey(),entry.getValue());
            }
    
        }
        }