Search code examples
mapreducehadoop2reducers

Limit the output from reducer


I have a mapper class which produces some tens of lines. This output is then sorted and merged through mapreduce internal framework. After this sorting, I want to get only top 5 records to be output by reducer. How can I achieve this? I have maintained a count variable, which is incremented in reduce method. But this is not working, it is giving all records in output. I think this is because reduce class is called for every input row to reducer, so count is initialised to 0 every time. Is there any way to maintain global variable?

public class Reduce2 extends Reducer{

int count=0;
@Override
protected void reduce(IntWritable1 key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

    int count=0;
    String key1 = "";
    for(Text value:values) {
        key1+=value;
    }
    if(count<5) {
        count++;
        context.write(new Text(key1), key);

    }
}

}


Solution

  • run() method of Reducer is executed once and it calls reduce() method for every key. Below is the default code of run() method of Reducer.

    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKey()) {
            reduce(context.getCurrentKey(), context.getValues(), context);
            // If a back up store is used, reset it
            Iterator<VALUEIN> iter = context.getValues().iterator();
            if(iter instanceof ReduceContext.ValueIterator) {
              ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
            }
          }
        } finally {
          cleanup(context);
        }
      }
    

    So if you define count variable in reduce() method, it will be initialised every time(for every key). Instead override this run() method of Reducer in your reducer implementation and move the count variable to this run() method.

      public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            int count=0;
            try {
              while (context.nextKey() && count<5) {
                  count++;
                reduce(context.getCurrentKey(), context.getValues(), context);
                // If a back up store is used, reset it
                Iterator<Text> iter = context.getValues().iterator();
                if(iter instanceof ReduceContext.ValueIterator) {
                  ((ReduceContext.ValueIterator<Text>)iter).resetBackupStore();        
                }
              }
            } finally {
              cleanup(context);
            }
    }
    

    This should work.