Search code examples
hadoopmapreducehashcodehadoop-partitioning

is having customPartitioner helpful if I already implement hashcode for keys in Map-reduce jobs?


I am writing a custom key class, without hashCode implementation.

I run a map-reduce job, but during the job configuration, I set the partitoner class: such as

        Job job = Job.getInstance(config);
        job.setJarByClass(ReduceSideJoinDriver.class);

        FileInputFormat.addInputPaths(job, filePaths.toString());
        FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));

        job.setMapperClass(JoiningMapper.class);
        job.setReducerClass(JoiningReducer.class);
        job.setPartitionerClass(TaggedJoiningPartitioner.class); -- Here is the partitioner set
        job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);
        job.setOutputKeyClass(TaggedKey.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);

Here is the partitioner implementation:

public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {

    @Override
    public int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {
        return Math.abs(taggedKey.getJoinKey().hashCode()) % numPartitions;
    }
}

I run the map-reduce job and save the output.

Now I comment-out job.setPartitionerClass(TaggedJoiningPartitioner.class); in the above job set up.

I implemented hashCode() in my custom class which is as follows:

public class TaggedKey implements Writable, WritableComparable<TaggedKey> {

    private Text joinKey = new Text();
    private IntWritable tag = new IntWritable();

    @Override
    public int compareTo(TaggedKey taggedKey) {
        int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());
        if(compareValue == 0 ){
            compareValue = this.tag.compareTo(taggedKey.getTag());
        }
       return compareValue;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        joinKey.write(out);
        tag.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        joinKey.readFields(in);
        tag.readFields(in);
    }

    @Override
    public int hashCode(){
        return joinKey.hashCode();
    }

    @Override
    public boolean equals(Object o){
        if (this==o)
            return true;
        if (!(o instanceof TaggedKey)){
            return false;
        }
        TaggedKey that=(TaggedKey)o;
        return this.joinKey.equals(that.joinKey);
    }
}

Now I run the job again (Note: I don't have any partitoner set). After the map-reduce job, I compare the output from previous one. They are both exactly same.

so My question is:

   1)  Is this behavior universal, that is always reproducible in any
        custom implementations? 

    2) Does implementing hashcode on my key class is same as doing a
    job.setPartitionerClass.

    3) If they both serve same purpose, what is the need for
    setPartitonerClass?

    4) if both hashcode() implementation and Partitonerclass
    implementation are conflicting, which one will take precedence?

Solution

  • You are getting the same result because your custom partitioner is doing exactly what the default one does. You are only moving code to another class and executing it there. Put in different logic like key().toString().length() % numPartitions or something other than getting the hashcode() % numPartitions and you will see a different distribution of keys to reducers.

    For example you can't get this partitoner just by editing hashcode()

    public static class MyPartitioner extends Partitioner {

        @Override
        public int getPartition(Text key, Text value, int numReduceTasks) {
    
            int len = key.value().length;
    
            if(numReduceTasks == 0)
                return 0;
    
            if(len <=numReduceTasks/3){               
                return 0;
            }
            if(len >numReduceTasks/3 && len <=numReduceTasks/2){
    
                return 1 % numReduceTasks;
            }
            else
                return len % numReduceTasks;
        }
    }