Search code examples
javahadoopmapreducehadoop2

Second largest salary using mapreduce - Output is not as expected


I wrote a small mapreduce job to find the second highest salary among a dataset. I belive the second higest salary logic is correct.But i am getting multiple output which is incorrect, there should be only one output with name for example John, 9000.And the output is not correct also,Here i am giving the dataset and code

hh,0,Jeet,3000
hk,1,Mayukh,4000
nn,2,Antara,3500
mm,3,Shubu,6000
ii,4,Parsi,8000  

The output should be Shubu,6000 ,but what i am getting the below output

  Antara    -2147483648
  Mayukh    -2147483648
  Parsi      3500
  Shubu      4000

And the code i am using is

 public class SecondHigestMapper extends Mapper<LongWritable,Text,Text,Text>{

private Text salary = new Text();
private Text name = new Text();
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{

    if(key.get()!=0){
        String split[]= value.toString().split(",");
        salary.set(split[2]+";"+split[3]);
        name.set("ignore");
        context.write(name,salary);
    }
}
}


 public class SecondHigestReducer extends Reducer<Text,Text,Text,IntWritable>{

public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
    int highest = 0;
    int second_highest = 0;
    int salary;

    for(Text val:values){
        String[] fn = val.toString().split("\\;");
        salary = Integer.parseInt(fn[3]);

        if(highest < salary){
              second_highest = highest;
              highest =salary;
         } else if(second_highest < salary){
              second_highest = salary;
        }
    }
    String seconHigest = String.valueOf(second_highest);
    context.write(new Text(key),new Text(seconHigest));

}

 }

public class SecondHigestDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    Job job = new Job(conf,"Second Higest Sal");
    job.setJarByClass(SecondHigestDriver.class);
    job.setMapperClass(SecondHigestMapper.class);
    job.setCombinerClass(SecondHigestReducer.class);
    job.setReducerClass(SecondHigestReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

}
   }

I am getting below exception

  Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at com.jeet.secondhigest.SecondHigestMapper.map(SecondHigestMapper.java:20)
at com.jeet.secondhigest.SecondHigestMapper.map(SecondHigestMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

Please help me to solve this


Solution

  • Force all salaries into a single reducer by using a single key

    name.set("ignore");  // Could use a NullWritable 
    salary.set(split[2]+";"+split[3])); // change to TextWritable 
    context.write(name,salary);  // need to change the signature of the mapper class 
    

    Then in the reducer, change the method to accept text values, then split those apart, cast the salary, and then compare those