Search code examples
hadoopmapreducehadoop2hadoop-partitioningbigdata

TotalOrderPartion with ChainMapper


I have a ChainMapper with 2 mappers associated to it. I am trying to perform a TotalOrderPartition on the last mapper in the chain with out much of a success.

Is there a way to enforce partitioning based on some sampling on the Nth mapper in the chain?

public class WordCountChain extends Configured implements Tool
{
    @Override
    public int run(String[] args) throws Exception 
    {
        Job job = new Job(getConf(), "Word Count V1 (Chain)");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        /*********** First Mapper ***********/
        Configuration wcpMapperConf = new Configuration(false);
        ChainMapper.addMapper(job, WordCountPreparationMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, wcpMapperConf);

        /*********** Second Mapper ***********/
        Configuration wcMapperConf = new Configuration(false);
        ChainMapper.addMapper(job, Mapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, wcMapperConf);

        /******* This enforces the Sampling/Partitioning over the First Mapper *******/
        //job.setInputFormatClass(SequenceFileInputFormat.class);
        //InputSampler.Sampler<Text, IntWritable> sampler = new InputSampler.RandomSampler<Text, IntWritable>(0.1, 10000, 10);
        //InputSampler.writePartitionFile(job, sampler);
        //job.addCacheFile( new URI( TotalOrderPartitioner.getPartitionFile(getConf()) ) );

        job.setNumReduceTasks(10);
        job.setReducerClass(WordCountReducer.class);
        return (job.waitForCompletion(true) ? 0 : 1);
     }

     public static void main(String[] args) throws Exception 
     {
        int exitCode = ToolRunner.run(new WordCountChain(), args);
        System.exit(exitCode);
     }
}

Solution

  • Unfortunately the RandomSampler runs before the job even starts, in fact it runs when you call

    InputSampler.writePartitionFile(job, sampler);
    

    This means that it does not run on the output of any Mapper, but rather on the input dataset of the job.

    If you need to partition based on the output of an Nth Mapper, you could split your job into two jobs, a map-only job and a mapreduce job. The first would run the chain of mappers up to the Nth mapper and then just store the output of that. The second job would sample and partition based on the input (which will be the output of the Nth Mapper), and then run the rest of the Mappers and your Reducer as well.