Search code examples
hadoopmapreducepartitioner

Hadoop Custom Partitioner Issue


I am having an issue with a custom intermediary keys not ending up in the partition I would expect based on the output of the custom partitioner's "getPartition" method. I can see in my mapper log files that the partitioner produces the expected partition numbers, however sometimes keys with a common partition number do not end up at the same reducer.

How would keys with a common "getPartition" output end up at different reducers?

I noticed in the mapper log files after all "getPartition" calls have been made many calls to the custom intermediary keys "hashCode" and "compareTo" methods are made. Is the mapper just doing within partition sorting or could this be part of the issue?

I have attached code for the custom intermediary key and partitioner. Note: I know exactly 1/2 of the keys have the "useBothGUIDFlag" set to true and 1/2 have this set to false (which is why I partition these keys to separate halves of the partition space). I also know that keys do not seem to cross over into the other half of the partition (i.e., "useBothGUIDFlag" keys do not end up in the "!useBothGUIDFlag" partitions and vice versa), rather they are mixed up within their half of the partitions.

public class IntermediaryKey implements WritableComparable<IntermediaryKey> {

    public String guid1;
    public String guid2;
    public boolean useBothGUIDFlag;

    @Override
    public int compareTo(IntermediaryKey other) {
        if(useBothGUIDFlag)
        {
            if(other.useBothGUIDFlag)
            {
                return this.hashCode() - other.hashCode();
            }else{
                return 1;
            }
        }else{
            if(!other.useBothGUIDFlag)
            {
                return guid2.compareTo(other.guid2);
            }else{
                return -1;
            }
        }
    }

    @Override
    public int hashCode()
    {
        if(useBothGUIDFlag)
        {
            if(guid1.compareTo(guid2) > 0)
            {
                return (guid2+guid1).hashCode();
            }else{
                return (guid1+guid2).hashCode();
            }
        }else{
            return guid2.hashCode();
        }
    }

    @Override
    public boolean equals(Object otherKey)
    {
        if(otherKey instanceof IntermediaryKey)
        {
            return this.compareTo((IntermediaryKey)otherKey) == 0;
        }
        return false;
    }
}

public static class KeyPartitioner extends Partitioner<IntermediaryKey, PathValue>
{
    @Override
    public int getPartition(IntermediaryKey key, PathValue value, int numReduceTasks) {
        int bothGUIDReducers = numReduceTasks/2;
        if(bothGUIDReducers == 0)
        {
            return 0;
        }

        int keyHashCode = Math.abs(key.hashCode());
        if(key.useBothGUIDFlag)
        {
            return keyHashCode % bothGUIDReducers;
        }else{
            return (bothGUIDReducers + (keyHashCode % (numReduceTasks-bothGUIDReducers)));
        }
    }
}

Solution

  • The problem ended up being in the serialization/deserialization of the custom key (IntermediaryKey). The "useBothGUIDFlag" variable was being read in as the opposite of what it should have been.

    Getting the "mapred.task.partition" property value within the reducer helped with noticing that this swap had occurred. The keys with the opposite "useBothGUIDFlag" values then seemed to be going to the correct reducer.