Search code examples
javamapreducehadoop2

Getting wrong value class: class org.apache.hadoop.io.LongWritable is not class org.apache.hadoop.io.IntWritable


I am learning MapReduce, I wrote a program which computes the total duration of booking done by member & non-member. I passed all the possible job configuration required, but when running the hadoop command, it throws wrong value class. I tried searching many solution in stackoverflow, but couldn't debug the problem. The Output of the Map and Input of the Reducer are correct. Can someone help me with it?

public class BixiMontrealAnalysis {

    public static class BixiMapper extends Mapper <LongWritable, Text, IntWritable, IntWritable> {
        public void map(LongWritable offset, Text line, Context context) throws IOException, InterruptedException {
            String csvAttributes[] = line.toString().split(",");
            int isMember = 0;
            int duration = 0;
            try {
                duration = Integer.parseInt(csvAttributes[4]);
                isMember = Integer.parseInt(csvAttributes[5]);
            } catch (Exception e) {
                System.out.println("Will Emit 0,0");
            }
            context.write(new IntWritable(isMember), new IntWritable(duration));
        }
    }

    public static class BixiReducer extends Reducer <IntWritable, IntWritable, IntWritable, LongWritable> {
        public void reduce(IntWritable isMember, Iterable <IntWritable> combinedDurationByIsMember, Context context) throws IOException, InterruptedException {
            long sum = 0L;
            for (IntWritable duration: combinedDurationByIsMember) {
                sum = sum + (long) duration.get();
            }
            context.write(isMember, new LongWritable(sum));
        }
    }

    public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "bix-montreal-job");
        job.setJarByClass(BixiMontrealAnalysis.class);
        job.setMapperClass(BixiMapper.class);

        job.setCombinerClass(BixiReducer.class);
        job.setReducerClass(BixiReducer.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(LongWritable.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

I am expecting the output as K,V as 0, sum of duration & 1, sum of duration

CSV Content

start_date,start_station_code,end_date,end_station_code,duration_sec,is_member
2019-07-01 00:00:03,6014,2019-07-01 00:04:26,6023,262,1
2019-07-01 00:00:07,6036,2019-07-01 00:34:54,6052,2087,0
2019-07-01 00:00:11,6018,2019-07-01 00:06:48,6148,396,1
2019-07-01 00:00:12,6202,2019-07-01 00:17:25,6280,1032,1
2019-07-01 00:00:15,6018,2019-07-01 00:06:57,6148,401,0
2019-07-01 00:00:20,6248,2019-07-01 00:15:40,6113,920,1
2019-07-01 00:00:37,6268,2019-07-01 00:15:00,6195,862,0

Below is the stack trace

Error: java.io.IOException: wrong value class: class org.apache.hadoop.io.LongWritable is not class org.apache.hadoop.io.IntWritable
    at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
    at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1374)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1691)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
    at com.onboarding.hadoop.BixiMontrealAnalysis$BixiReducer.reduce(BixiMontrealAnalysis.java:43)
    at com.onboarding.hadoop.BixiMontrealAnalysis$BixiReducer.reduce(BixiMontrealAnalysis.java:37)

Solution

  • job.setCombinerClass(BixiReducer.class);
    

    I have set Combiner Class same as the Reducer, which shouldn't be, taking reference to standard WordCount Problem. I did a study about Combiner and figured out that the use of Combiner class is to produce intermediate record, so the load on Reducer is less.