I have some data that is keyed by ids in the range of 0 to 200-something million and I need to split it up into bucks for ranges like 0-5mil, 5mil - 10mil, etc.
I'm attempting to use a custom partitioner on Hadoop for this final part so that the last part of my code looks something like this:
Conns = FOREACH ConnsGrouped GENERATE group as memberId, $1.companyId as companyIds;
ConnsPartitioned = DISTINCT Conns PARTITION BY com.mypackage.SearchNodePartitioner PARALLEL 50;
rmf $connections_file
Store ConnsPartitioned INTO 'test' using AvroStorage(...);
My partitioner looks like this:
public class SearchNodePartitioner<Long, V> implements Partitioner<Long, V>
{
@Override
public void configure(JobConf conf)
{
// Nothing
}
@Override
public int getPartition(Long key, V value, int numPartitions)
{
return new Double(Math.floor(key / (5.0 * Math.pow(10, 6)))).intValue() % numPartitions;
}
}
but it doesn't seem to be called at all. Even when I replace the return line with return 1;
the data across files seems to be hash distributed with the default behaviors.
The answer to DISTINCT + custom partitioner is: you can't do that anymore (as I just found out). DISTINCT now uses a optimized special partitioner.
See:
https://issues.apache.org/jira/browse/PIG-3385
A workaround:
A = //some tuple...;
B = GROUP A BY field PARTITION BY custom;
STORE B INTO 'foo' USING ....;
Later:
B = LOAD 'foo' USING ...;
A = FOREACH B GENERATE FLATTEN($1);