I wrote a small mapreduce job to find the second highest salary among a dataset. I belive the second higest salary logic is correct.But i am getting multiple output which is incorrect, there should be only one output with name for example John, 9000.And the output is not correct also,Here i am giving the dataset and code
hh,0,Jeet,3000
hk,1,Mayukh,4000
nn,2,Antara,3500
mm,3,Shubu,6000
ii,4,Parsi,8000
The output should be Shubu,6000 ,but what i am getting the below output
Antara -2147483648
Mayukh -2147483648
Parsi 3500
Shubu 4000
And the code i am using is
public class SecondHigestMapper extends Mapper<LongWritable,Text,Text,Text>{
private Text salary = new Text();
private Text name = new Text();
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
if(key.get()!=0){
String split[]= value.toString().split(",");
salary.set(split[2]+";"+split[3]);
name.set("ignore");
context.write(name,salary);
}
}
}
public class SecondHigestReducer extends Reducer<Text,Text,Text,IntWritable>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
int highest = 0;
int second_highest = 0;
int salary;
for(Text val:values){
String[] fn = val.toString().split("\\;");
salary = Integer.parseInt(fn[3]);
if(highest < salary){
second_highest = highest;
highest =salary;
} else if(second_highest < salary){
second_highest = salary;
}
}
String seconHigest = String.valueOf(second_highest);
context.write(new Text(key),new Text(seconHigest));
}
}
public class SecondHigestDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf,"Second Higest Sal");
job.setJarByClass(SecondHigestDriver.class);
job.setMapperClass(SecondHigestMapper.class);
job.setCombinerClass(SecondHigestReducer.class);
job.setReducerClass(SecondHigestReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
I am getting below exception
Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at com.jeet.secondhigest.SecondHigestMapper.map(SecondHigestMapper.java:20)
at com.jeet.secondhigest.SecondHigestMapper.map(SecondHigestMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
Please help me to solve this
Force all salaries into a single reducer by using a single key
name.set("ignore"); // Could use a NullWritable
salary.set(split[2]+";"+split[3])); // change to TextWritable
context.write(name,salary); // need to change the signature of the mapper class
Then in the reducer, change the method to accept text values, then split those apart, cast the salary, and then compare those