Objective :
Implement HashPartition and check the no of reducers that are getting created automatically.
Any help and any sample code is always appreciated for this purpose.
What I did :
I ran a map reduce program with Hash Partition implemented on a 250MB csv file. But I still see that hdfs is using only 1 reducer to do the aggregation. If I have understood correctly, hdfs should create partitions automatically and evenly distribute the data. Then n reducers will work on those n partitions created. But I do not see that happening. Can anyone help me achieve that with Hash Partitions. I do not want to define the no of partitions.
Mapper Code :
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
String airlineid = line[7];
//int tailno = Integer.parseInt(line[10].replace("\"", ""));
String tailno = line[9].replace("\"", "");
if (tailno.length() != 0 ){
//System.out.println(airlineid + " " + tailno + " " + tailno.length());
context.write(new Text(airlineid), new Text(tailno));
}
}
}
Reducer Code :
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int count=0;
for (Text value : values) {
count ++;
}
//context.write(key, new IntWritable(maxValue));
context.write(key, new IntWritable(count));
}
Partitioner Code :
public class FlightPartition extends Partitioner<Text, Text> {
public int getPartition(Text key, Text value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
Driver :
public class Flight
{
public static void main (String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Flight");
job.setJarByClass(Flight.class);
job.setMapperClass(FlightMapper.class);
job.setReducerClass(FlightReducer.class);
job.setPartitionerClass(FlightPartition.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Log :
15/11/09 06:14:14 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=7008211
FILE: Number of bytes written=14438683
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=211682444
HDFS: Number of bytes written=178
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Killed map tasks=2
Launched map tasks=5
Launched reduce tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=2235296
Total time spent by all reduces in occupied slots (ms)=606517
Total time spent by all map tasks (ms)=2235296
Total time spent by all reduce tasks (ms)=606517
Total vcore-seconds taken by all map tasks=2235296
Total vcore-seconds taken by all reduce tasks=606517
Total megabyte-seconds taken by all map tasks=2288943104
Total megabyte-seconds taken by all reduce tasks=621073408
Map-Reduce Framework
Map input records=470068
Map output records=467281
Map output bytes=6073643
Map output materialized bytes=7008223
Input split bytes=411
Combine input records=0
Combine output records=0
Reduce input groups=15
Reduce shuffle bytes=7008223
Reduce input records=467281
Reduce output records=15
Spilled Records=934562
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=3701
CPU time spent (ms)=277080
Physical memory (bytes) snapshot=590581760
Virtual memory (bytes) snapshot=3196801024
Total committed heap usage (bytes)=441397248
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=211682033
File Output Format Counters
Bytes Written=178
Check your mapred-default.xml
file and look for
mapreduce.job.reduces
property. Change the value to > 1 for more reducers in your cluster. This property will be ignored if when mapreduce.jobtracker.address is "local
".
You can override default property in java with
job.setNumReduceTasks(3)
Have a look at this article for complete list of mapred-default.xml from Apache.
How Many Reduces? ( From Apache)
The right number of reduces seems to be 0.95 or 1.75 multiplied by ( * ).
With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.
Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.
How many maps?
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.
Have a look at Apache Map Reduce Tutorial