I am learning MapReduce, I wrote a program which computes the total duration of booking done by member & non-member. I passed all the possible job configuration required, but when running the hadoop command, it throws wrong value class. I tried searching many solution in stackoverflow, but couldn't debug the problem. The Output of the Map and Input of the Reducer are correct. Can someone help me with it?
public class BixiMontrealAnalysis {
public static class BixiMapper extends Mapper <LongWritable, Text, IntWritable, IntWritable> {
public void map(LongWritable offset, Text line, Context context) throws IOException, InterruptedException {
String csvAttributes[] = line.toString().split(",");
int isMember = 0;
int duration = 0;
try {
duration = Integer.parseInt(csvAttributes[4]);
isMember = Integer.parseInt(csvAttributes[5]);
} catch (Exception e) {
System.out.println("Will Emit 0,0");
}
context.write(new IntWritable(isMember), new IntWritable(duration));
}
}
public static class BixiReducer extends Reducer <IntWritable, IntWritable, IntWritable, LongWritable> {
public void reduce(IntWritable isMember, Iterable <IntWritable> combinedDurationByIsMember, Context context) throws IOException, InterruptedException {
long sum = 0L;
for (IntWritable duration: combinedDurationByIsMember) {
sum = sum + (long) duration.get();
}
context.write(isMember, new LongWritable(sum));
}
}
public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "bix-montreal-job");
job.setJarByClass(BixiMontrealAnalysis.class);
job.setMapperClass(BixiMapper.class);
job.setCombinerClass(BixiReducer.class);
job.setReducerClass(BixiReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
I am expecting the output as K,V as 0, sum of duration
& 1, sum of duration
CSV Content
start_date,start_station_code,end_date,end_station_code,duration_sec,is_member
2019-07-01 00:00:03,6014,2019-07-01 00:04:26,6023,262,1
2019-07-01 00:00:07,6036,2019-07-01 00:34:54,6052,2087,0
2019-07-01 00:00:11,6018,2019-07-01 00:06:48,6148,396,1
2019-07-01 00:00:12,6202,2019-07-01 00:17:25,6280,1032,1
2019-07-01 00:00:15,6018,2019-07-01 00:06:57,6148,401,0
2019-07-01 00:00:20,6248,2019-07-01 00:15:40,6113,920,1
2019-07-01 00:00:37,6268,2019-07-01 00:15:00,6195,862,0
Below is the stack trace
Error: java.io.IOException: wrong value class: class org.apache.hadoop.io.LongWritable is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1374)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1691)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at com.onboarding.hadoop.BixiMontrealAnalysis$BixiReducer.reduce(BixiMontrealAnalysis.java:43)
at com.onboarding.hadoop.BixiMontrealAnalysis$BixiReducer.reduce(BixiMontrealAnalysis.java:37)
job.setCombinerClass(BixiReducer.class);
I have set Combiner
Class same as the Reducer
, which shouldn't be, taking reference to standard WordCount Problem. I did a study about Combiner
and figured out that the use of Combiner
class is to produce intermediate record, so the load on Reducer
is less.