Search code examples
hadoopmapreducehadoop-partitioning

HashPartition in MapReduce


Objective :

Implement HashPartition and check the no of reducers that are getting created automatically.

Any help and any sample code is always appreciated for this purpose.

What I did :

I ran a map reduce program with Hash Partition implemented on a 250MB csv file. But I still see that hdfs is using only 1 reducer to do the aggregation. If I have understood correctly, hdfs should create partitions automatically and evenly distribute the data. Then n reducers will work on those n partitions created. But I do not see that happening. Can anyone help me achieve that with Hash Partitions. I do not want to define the no of partitions.

Mapper Code :

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


        String[] line = value.toString().split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
        String airlineid = line[7];
        //int tailno = Integer.parseInt(line[10].replace("\"", ""));
        String tailno = line[9].replace("\"", "");

        if (tailno.length() != 0 ){
        //System.out.println(airlineid + " " + tailno + " " + tailno.length());
        context.write(new Text(airlineid), new Text(tailno));
        }


    }       

}

Reducer Code :

public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {


        int count=0;

        for (Text value : values) {
        count ++;
        }

        //context.write(key, new IntWritable(maxValue));
        context.write(key, new IntWritable(count));

    }

Partitioner Code :

public class FlightPartition extends Partitioner<Text, Text> {

     public int getPartition(Text key, Text value, int numReduceTasks) {
         return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
     }

}

Driver :

public class Flight
{

            public static void main (String[] args) throws Exception
            {

                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf, "Flight");
                job.setJarByClass(Flight.class);

                job.setMapperClass(FlightMapper.class);
                job.setReducerClass(FlightReducer.class);
                job.setPartitionerClass(FlightPartition.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                System.exit(job.waitForCompletion(true) ? 0 : 1);

             }
}

Log :

15/11/09 06:14:14 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=7008211
        FILE: Number of bytes written=14438683
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=211682444
        HDFS: Number of bytes written=178
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Killed map tasks=2
        Launched map tasks=5
        Launched reduce tasks=1
        Data-local map tasks=5
        Total time spent by all maps in occupied slots (ms)=2235296
        Total time spent by all reduces in occupied slots (ms)=606517
        Total time spent by all map tasks (ms)=2235296
        Total time spent by all reduce tasks (ms)=606517
        Total vcore-seconds taken by all map tasks=2235296
        Total vcore-seconds taken by all reduce tasks=606517
        Total megabyte-seconds taken by all map tasks=2288943104
        Total megabyte-seconds taken by all reduce tasks=621073408
    Map-Reduce Framework
        Map input records=470068
        Map output records=467281
        Map output bytes=6073643
        Map output materialized bytes=7008223
        Input split bytes=411
        Combine input records=0
        Combine output records=0
        Reduce input groups=15
        Reduce shuffle bytes=7008223
        Reduce input records=467281
        Reduce output records=15
        Spilled Records=934562
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=3701
        CPU time spent (ms)=277080
        Physical memory (bytes) snapshot=590581760
        Virtual memory (bytes) snapshot=3196801024
        Total committed heap usage (bytes)=441397248
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=211682033
    File Output Format Counters 
        Bytes Written=178

Solution

  • Check your mapred-default.xml file and look for

    mapreduce.job.reduces property. Change the value to > 1 for more reducers in your cluster. This property will be ignored if when mapreduce.jobtracker.address is "local".

    You can override default property in java with

    job.setNumReduceTasks(3)
    

    Have a look at this article for complete list of mapred-default.xml from Apache.

    How Many Reduces? ( From Apache)

    The right number of reduces seems to be 0.95 or 1.75 multiplied by ( * ).

    With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

    Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

    How many maps?

    The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.

    The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.

    Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.

    Have a look at Apache Map Reduce Tutorial