Search code examples
javamapreducehadoop2

When writing to context in the Reducer.reduce method, why is the toString method invoked and not the write method?


I'm writing a map-reduce batch job, which consists of 3-4 chained jobs. In the second job I'm using a custom class as the output value class when writing to context via context.write(). When studying the behavior of the code, I noticed that the toString method of this custom class is invoked, rather then the write method. Why does this happen, if the class implements the Writable interface, and I implemented the write method?

The custom class's code:

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class WritableLongPair implements Writable {

private long l1;
private long l2;

public WritableLongPair() {
    l1 = 0;
    l2 = 0;
}

public WritableLongPair(long l1, long l2) {
    this.l1 = l1;
    this.l2 = l2;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeLong(l1);
    dataOutput.writeLong(l2);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
    l1 = dataInput.readLong();
    l2 = dataInput.readLong();
}

@Override
public String toString() {
    return l1 + " " + l2;
}
}

The second job's code:

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Phase2 {

private static final int ASCII_OFFSET = 97;

public static class Mapper2
        extends Mapper<Object, Text, Text, LongWritable>{

    @Override
    public void map(Object key, Text value, Context context
    ) throws IOException, InterruptedException {
        String[] valueAsStrings = value.toString().split("\t");
        String actualKey = valueAsStrings[0];
        LongWritable actualValue = new LongWritable(Long.parseLong(valueAsStrings[1]));
        String[] components = actualKey.toString().split("[$]");
        if (!components[1].equals("*")) {
            context.write(new Text(components[1] + "$" + components[0]), actualValue);
            context.write(new Text(components[1] + "$*"), actualValue);
        }
        context.write(new Text(actualKey), actualValue);
    }
}

public static class Partitioner2 extends Partitioner<Text, LongWritable> {

    @Override
    public int getPartition(Text text, LongWritable longWritable, int i) {
        return (int)(text.toString().charAt(0)) - ASCII_OFFSET;
    }
}

public static class Reducer2
        extends Reducer<Text, LongWritable, Text, WritableLongPair> {

        private Text currentKey;
        private long sum;

    @Override
    public void setup(Context context) {
        currentKey = new Text();
        currentKey.set("");
        sum = 0l;
    }

    private String textContent(String w1, String w2) {
        if (w2.equals("*"))
            return w1 + "$*";
        if (w1.compareTo(w2) < 0)
            return w1 + "$" + w2;
        else
            return w2 + "$" + w1;
    }

    public void reduce(Text key, Iterable<LongWritable> counts,
                       Context context
    ) throws IOException, InterruptedException {
        long sumPair = 0l;
        String[] components = key.toString().split("[$]");
        for (LongWritable count : counts) {
            if (currentKey.equals(components[0])) {
                if (components[1].equals("*"))
                    sum += count.get();
                else
                    sumPair += count.get();
            }
            else {
                sum = count.get();
                currentKey.set(components[0]);
            }
        }
        if (!components[1].equals("*"))
            context.write(new Text(textContent(components[0], components[1])), new WritableLongPair(sumPair, sum));
    }
}

public static class Comparator2 extends WritableComparator {

    @Override
    public int compare(WritableComparable o1, WritableComparable o2) {
        String[] components1 = o1.toString().split("[$]");
        String[] components2 = o2.toString().split("[$]");
        if (components1[1].equals("*") && components2[1].equals("*"))
            return components1[0].compareTo(components2[0]);
        if (components1[1].equals("*")) {
            if (components1[0].equals(components2[0]))
                return -1;
            else
                return components1[0].compareTo(components2[0]);
        }
        if (components2[1].equals("*")) {
            if (components1[0].equals(components2[0]))
                return 1;
            else
                return components1[0].compareTo(components2[0]);
        }
        return components1[0].compareTo(components2[0]);
    }

}

}

...and how I define my jobs:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Manager {

public static void main(String[] args) throws Exception {
    Configuration conf1 = new Configuration();
    if (args.length != 2) {
        System.err.println("Usage: Manager <in> <out>");
        System.exit(1);
    }
    Job job1 = Job.getInstance(conf1, "Phase 1");
    job1.setJarByClass(Phase1.class);
    job1.setMapperClass(Phase1.Mapper1.class);
    job1.setPartitionerClass(Phase1.Partitioner1.class);
//        job1.setCombinerClass(Phase1.Combiner1.class);
    job1.setReducerClass(Phase1.Reducer1.class);
    job1.setInputFormatClass(SequenceFileInputFormat.class);
//        job1.setOutputFormatClass(FileOutputFormat.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(LongWritable.class);
    job1.setNumReduceTasks(12);
    FileInputFormat.addInputPath(job1, new Path(args[0]));
    Path output1 = new Path(args[1]);
    FileOutputFormat.setOutputPath(job1, output1);
    boolean result = job1.waitForCompletion(true);
    Counter counter = job1.getCounters().findCounter("org.apache.hadoop.mapreduce.TaskCounter", "REDUCE_INPUT_RECORDS");
    System.out.println("Num of pairs sent to reducers in phase 1: " + counter.getValue());

    Configuration conf2 = new Configuration();
    Job job2 = Job.getInstance(conf2, "Phase 2");
    job2.setJarByClass(Phase2.class);
    job2.setMapperClass(Phase2.Mapper2.class);
    job2.setPartitionerClass(Phase2.Partitioner2.class);
//        job2.setCombinerClass(Phase2.Combiner2.class);
    job2.setReducerClass(Phase2.Reducer2.class);
    job2.setMapOutputKeyClass(Text.class);
    job2.setMapOutputValueClass(LongWritable.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(WritableLongPair.class);
    job2.setNumReduceTasks(26);
//        job2.setGroupingComparatorClass(Phase2.Comparator2.class);
    FileInputFormat.addInputPath(job2, output1);
    Path output2 = new Path(args[1] + "2");
    FileOutputFormat.setOutputPath(job2, output2);
    result = job2.waitForCompletion(true);
    counter = job2.getCounters().findCounter("org.apache.hadoop.mapreduce.TaskCounter", "REDUCE_INPUT_RECORDS");
    System.out.println("Num of pairs sent to reducers in phase 2: " + counter.getValue());


//        System.exit(job1.waitForCompletion(true) ? 0 : 1);

}
}

Solution

  • If you use the default output formatter (TextOutputFormat) Hadoop will call the toString() method on the object when it writes it to disk. This is expected behavior. The context.write() is being called, but its the output format that's controlling how the data appears on disk.

    If you're chaining jobs together you would typically use SequenceFileInputFormat and SequenceFileOutputFormat for all of the jobs, since it makes reading the output from one job into a subsequent job easy.