Search code examples
javahadoopmapreducehadoop2

Reducer Class not working as expected in Hadoop MapReduce


I tried to implement simple group by in Mapreduce.

My Input file given below:

7369,SMITH,CLERK,800,20
7499,ALLEN,SALESMAN,1600,30
7521,WARD,SALESMAN,1250,30
7566,JONES,MANAGER,2975,20
7654,MARTIN,SALESMAN,1250,30
7698,BLAKE,MANAGER,2850,30
7782,CLARK,MANAGER,2450,10
7788,SCOTT,ANALYST,3000,20
7839,KING,PRESIDENT,5000,10
7844,TURNER,SALESMAN,1500,30
7876,ADAMS,CLERK,1100,20
7900,JAMES,CLERK,950,30
7902,FORD,ANALYST,3000,20
7934,MILLER,CLERK,1300,10

My Mapper Class:

public class Groupmapper extends Mapper<Object,Text,IntWritable,IntWritable> {
    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
        String line = value.toString();
        String[] parts=line.split(",");
        String token1=parts[3];
        String token2=parts[4];
        int deptno=Integer.parseInt(token2);
        int sal=Integer.parseInt(token1);
        context.write(new IntWritable(deptno),new IntWritable(sal));
    }    
}

Reducer Class:

public class Groupreducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    IntWritable result=new IntWritable();
    public void Reduce(IntWritable key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
        int sum=0;
        for(IntWritable val:values){
            sum+=val.get();
        }
        result.set(sum);
        context.write(key,result);
    }
}

Driver Class:

public class Group {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf=new Configuration();
        Job job=Job.getInstance(conf,"Group");
        job.setJarByClass(Group.class);
        job.setMapperClass(Groupmapper.class);
        job.setCombinerClass(Groupreducer.class);
        job.setReducerClass(Groupreducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);         
    }
}

Expected output should be:

10      8750
20      10875
30      9400

But It prints output given below. It didn't aggregate the values. It works like identity reducer.

10      1300
10      5000
10      2450
20      1100
20      3000
20      800
20      2975
20      3000
30      1500
30      1600
30      2850
30      1250
30      1250
30      950

Reducer function not working properly.


Solution

  • It does look like the reduce isn't being used. Thus taking a closer look at your reducer would be the next step in debug.

    If you add an @Override to your reduce method (as you do on your map method) you'll see that that you get an Method does not override method from its superclass error. This means that hadoop won't be using your reduce method, and will be using the default identity implementation.

    The problem is that you have:

    public void Reduce(IntWritable key,Iterable<IntWritable> values, Context context)

    and it should be:

    public void reduce(IntWritable key,Iterable<IntWritable> values, Context context)

    The only difference being the name of the method should start with a lowercase r.