I am writing a custom key
class, without hashCode
implementation.
I run a map-reduce
job, but during the job configuration, I set the partitoner
class:
such as
Job job = Job.getInstance(config);
job.setJarByClass(ReduceSideJoinDriver.class);
FileInputFormat.addInputPaths(job, filePaths.toString());
FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));
job.setMapperClass(JoiningMapper.class);
job.setReducerClass(JoiningReducer.class);
job.setPartitionerClass(TaggedJoiningPartitioner.class); -- Here is the partitioner set
job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);
job.setOutputKeyClass(TaggedKey.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
Here is the partitioner
implementation:
public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {
@Override
public int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {
return Math.abs(taggedKey.getJoinKey().hashCode()) % numPartitions;
}
}
I run the map-reduce
job and save the output.
Now I comment-out job.setPartitionerClass(TaggedJoiningPartitioner.class);
in the above job set up.
I implemented hashCode()
in my custom class which is as follows:
public class TaggedKey implements Writable, WritableComparable<TaggedKey> {
private Text joinKey = new Text();
private IntWritable tag = new IntWritable();
@Override
public int compareTo(TaggedKey taggedKey) {
int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());
if(compareValue == 0 ){
compareValue = this.tag.compareTo(taggedKey.getTag());
}
return compareValue;
}
@Override
public void write(DataOutput out) throws IOException {
joinKey.write(out);
tag.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
joinKey.readFields(in);
tag.readFields(in);
}
@Override
public int hashCode(){
return joinKey.hashCode();
}
@Override
public boolean equals(Object o){
if (this==o)
return true;
if (!(o instanceof TaggedKey)){
return false;
}
TaggedKey that=(TaggedKey)o;
return this.joinKey.equals(that.joinKey);
}
}
Now I run the job again (Note: I don't have any partitoner
set). After the map-reduce job, I compare the output from previous one. They are both exactly same.
so My question is:
1) Is this behavior universal, that is always reproducible in any
custom implementations?
2) Does implementing hashcode on my key class is same as doing a
job.setPartitionerClass.
3) If they both serve same purpose, what is the need for
setPartitonerClass?
4) if both hashcode() implementation and Partitonerclass
implementation are conflicting, which one will take precedence?
You are getting the same result because your custom partitioner is doing exactly what the default one does. You are only moving code to another class and executing it there. Put in different logic like key().toString().length() % numPartitions or something other than getting the hashcode() % numPartitions and you will see a different distribution of keys to reducers.
For example you can't get this partitoner just by editing hashcode()
public static class MyPartitioner extends Partitioner {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
int len = key.value().length;
if(numReduceTasks == 0)
return 0;
if(len <=numReduceTasks/3){
return 0;
}
if(len >numReduceTasks/3 && len <=numReduceTasks/2){
return 1 % numReduceTasks;
}
else
return len % numReduceTasks;
}
}