Search code examples
hadoopapache-pighadoop-partitioning

Custom Partitioner in Hadoop


I have some data that is keyed by ids in the range of 0 to 200-something million and I need to split it up into bucks for ranges like 0-5mil, 5mil - 10mil, etc.

I'm attempting to use a custom partitioner on Hadoop for this final part so that the last part of my code looks something like this:

Conns = FOREACH ConnsGrouped GENERATE group as memberId, $1.companyId as companyIds;
ConnsPartitioned = DISTINCT Conns PARTITION BY com.mypackage.SearchNodePartitioner PARALLEL 50;

rmf $connections_file

Store ConnsPartitioned INTO 'test' using AvroStorage(...);

My partitioner looks like this:

public class SearchNodePartitioner<Long, V> implements Partitioner<Long, V>
{
    @Override
    public void configure(JobConf conf) 
    {
        // Nothing
    }

    @Override
    public int getPartition(Long key, V value, int numPartitions) 
    {
       return new Double(Math.floor(key / (5.0 * Math.pow(10, 6)))).intValue() % numPartitions;
    }

}

but it doesn't seem to be called at all. Even when I replace the return line with return 1; the data across files seems to be hash distributed with the default behaviors.


Solution

  • The answer to DISTINCT + custom partitioner is: you can't do that anymore (as I just found out). DISTINCT now uses a optimized special partitioner.

    See:

    http://mail-archives.apache.org/mod_mbox/pig-user/201307.mbox/%3C14FE3AC3-DBA5-4898-AF94-0C34819A0D8B%40hortonworks.com%3E

    https://issues.apache.org/jira/browse/PIG-3385

    A workaround:

    A = //some tuple...;

    B = GROUP A BY field PARTITION BY custom;

    STORE B INTO 'foo' USING ....;

    Later:

    B = LOAD 'foo' USING ...;

    A = FOREACH B GENERATE FLATTEN($1);