I'm writing a map-reduce batch job, which consists of 3-4 chained jobs. In the second job I'm using a custom class as the output value class when writing to context via context.write()
.
When studying the behavior of the code, I noticed that the toString
method of this custom class is invoked, rather then the write
method. Why does this happen, if the class implements the Writable
interface, and I implemented the write
method?
The custom class's code:
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WritableLongPair implements Writable {
private long l1;
private long l2;
public WritableLongPair() {
l1 = 0;
l2 = 0;
}
public WritableLongPair(long l1, long l2) {
this.l1 = l1;
this.l2 = l2;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(l1);
dataOutput.writeLong(l2);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
l1 = dataInput.readLong();
l2 = dataInput.readLong();
}
@Override
public String toString() {
return l1 + " " + l2;
}
}
The second job's code:
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Phase2 {
private static final int ASCII_OFFSET = 97;
public static class Mapper2
extends Mapper<Object, Text, Text, LongWritable>{
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String[] valueAsStrings = value.toString().split("\t");
String actualKey = valueAsStrings[0];
LongWritable actualValue = new LongWritable(Long.parseLong(valueAsStrings[1]));
String[] components = actualKey.toString().split("[$]");
if (!components[1].equals("*")) {
context.write(new Text(components[1] + "$" + components[0]), actualValue);
context.write(new Text(components[1] + "$*"), actualValue);
}
context.write(new Text(actualKey), actualValue);
}
}
public static class Partitioner2 extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
return (int)(text.toString().charAt(0)) - ASCII_OFFSET;
}
}
public static class Reducer2
extends Reducer<Text, LongWritable, Text, WritableLongPair> {
private Text currentKey;
private long sum;
@Override
public void setup(Context context) {
currentKey = new Text();
currentKey.set("");
sum = 0l;
}
private String textContent(String w1, String w2) {
if (w2.equals("*"))
return w1 + "$*";
if (w1.compareTo(w2) < 0)
return w1 + "$" + w2;
else
return w2 + "$" + w1;
}
public void reduce(Text key, Iterable<LongWritable> counts,
Context context
) throws IOException, InterruptedException {
long sumPair = 0l;
String[] components = key.toString().split("[$]");
for (LongWritable count : counts) {
if (currentKey.equals(components[0])) {
if (components[1].equals("*"))
sum += count.get();
else
sumPair += count.get();
}
else {
sum = count.get();
currentKey.set(components[0]);
}
}
if (!components[1].equals("*"))
context.write(new Text(textContent(components[0], components[1])), new WritableLongPair(sumPair, sum));
}
}
public static class Comparator2 extends WritableComparator {
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
String[] components1 = o1.toString().split("[$]");
String[] components2 = o2.toString().split("[$]");
if (components1[1].equals("*") && components2[1].equals("*"))
return components1[0].compareTo(components2[0]);
if (components1[1].equals("*")) {
if (components1[0].equals(components2[0]))
return -1;
else
return components1[0].compareTo(components2[0]);
}
if (components2[1].equals("*")) {
if (components1[0].equals(components2[0]))
return 1;
else
return components1[0].compareTo(components2[0]);
}
return components1[0].compareTo(components2[0]);
}
}
}
...and how I define my jobs:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Manager {
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
if (args.length != 2) {
System.err.println("Usage: Manager <in> <out>");
System.exit(1);
}
Job job1 = Job.getInstance(conf1, "Phase 1");
job1.setJarByClass(Phase1.class);
job1.setMapperClass(Phase1.Mapper1.class);
job1.setPartitionerClass(Phase1.Partitioner1.class);
// job1.setCombinerClass(Phase1.Combiner1.class);
job1.setReducerClass(Phase1.Reducer1.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
// job1.setOutputFormatClass(FileOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(LongWritable.class);
job1.setNumReduceTasks(12);
FileInputFormat.addInputPath(job1, new Path(args[0]));
Path output1 = new Path(args[1]);
FileOutputFormat.setOutputPath(job1, output1);
boolean result = job1.waitForCompletion(true);
Counter counter = job1.getCounters().findCounter("org.apache.hadoop.mapreduce.TaskCounter", "REDUCE_INPUT_RECORDS");
System.out.println("Num of pairs sent to reducers in phase 1: " + counter.getValue());
Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2, "Phase 2");
job2.setJarByClass(Phase2.class);
job2.setMapperClass(Phase2.Mapper2.class);
job2.setPartitionerClass(Phase2.Partitioner2.class);
// job2.setCombinerClass(Phase2.Combiner2.class);
job2.setReducerClass(Phase2.Reducer2.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(LongWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(WritableLongPair.class);
job2.setNumReduceTasks(26);
// job2.setGroupingComparatorClass(Phase2.Comparator2.class);
FileInputFormat.addInputPath(job2, output1);
Path output2 = new Path(args[1] + "2");
FileOutputFormat.setOutputPath(job2, output2);
result = job2.waitForCompletion(true);
counter = job2.getCounters().findCounter("org.apache.hadoop.mapreduce.TaskCounter", "REDUCE_INPUT_RECORDS");
System.out.println("Num of pairs sent to reducers in phase 2: " + counter.getValue());
// System.exit(job1.waitForCompletion(true) ? 0 : 1);
}
}
If you use the default output formatter (TextOutputFormat) Hadoop will call the toString()
method on the object when it writes it to disk. This is expected behavior. The context.write()
is being called, but its the output format that's controlling how the data appears on disk.
If you're chaining jobs together you would typically use SequenceFileInputFormat
and SequenceFileOutputFormat
for all of the jobs, since it makes reading the output from one job into a subsequent job easy.