Let say I want to set these configurations for MR job:
mapred.map.tasks
mapred.reduce.tasks
mapred.tasktracker.map.tasks.maximum
mapred.tasktracker.reduce.tasks.maximum
mapred.reduce.slowstart.completed.maps
what are the possible ways I get set this?
I can set in mapred-site.xml. but that would be applicable for all jobs I run.
If I want to set these specifically for individual jobs, is this valid:
conf.set("mapred.tasktracker.map.tasks.maximum", 10)
(I have not see this kind anywhere)
or only through commandline argument
such as -D mapred.tasktracker.map.tasks.maximum=10
(This seems to be more common usage)
Solution 1: Create a BaseJob class:
public abstract class BaseJob extends Configured implements Tool {
// method to set the configuration for the job and the mapper and the reducer classes
protected Job setupJob(Transformation transformation, final Configuration conf) throws Exception {
//Get the job object from the global configuration
Job job = new Job(conf);
//Set the transformation specific details
if(transformation.getMapperClass() != null)
job.setMapperClass(transformation.getMapperClass());
if(transformation.getReducerClass() != null)
job.setReducerClass(transformation.getReducerClass());
if(transformation.getMapOutputKeyClass() != null)
job.setMapOutputKeyClass(transformation.getMapOutputKeyClass());
if(transformation.getMapOutputValueClass() != null)
job.setMapOutputValueClass(transformation.getMapOutputValueClass());
if(transformation.getPartitionerClass() != null)
job.setPartitionerClass(transformation.getPartitionerClass());
if(transformation.getSortComparatorClass() != null)
job.setSortComparatorClass(transformation.getSortComparatorClass());
if(transformation.getGroupingComparator() != null)
job.setGroupingComparatorClass(transformation.getGroupingComparator());
if(transformation.getInputFormatClass() != null)
job.setInputFormatClass(transformation.getInputFormatClass());
if(transformation.getOutputKeyClass() != null)
job.setOutputKeyClass(transformation.getOutputKeyClass());
if(transformation.getOutputValueClass() != null)
job.setOutputValueClass(transformation.getOutputValueClass());
if(transformation.getJarByClass() != null)
job.setJarByClass(transformation.getJarByClass());
return job;
}
protected abstract class Transformation {
public abstract Class<?> getJarByClass();
public abstract Class<? extends Mapper> getMapperClass();
public abstract Class<? extends Reducer> getCombinerClass();
public abstract Class<? extends Reducer> getReducerClass();
public abstract Class<?> getOutputKeyClass();
public abstract Class<?> getOutputValueClass();
public abstract Class<?> getMapOutputKeyClass();
public abstract Class<?> getMapOutputValueClass();
public abstract Class<? extends Partitioner> getPartitionerClass();
public abstract Class<? extends WritableComparator> getSortComparatorClass();
public abstract Class<? extends WritableComparator> getGroupingComparator();
public abstract Class<? extends InputFormat<?,?>> getInputFormatClass();
public abstract Class<? extends OutputFormat<?,?>> getOutputFormatClass();
}
}
Then write your MyTransformationJob class and set your configurations
public class MyTransformationJob extends BaseJob {
private Job getJobConf(final Configuration conf) throws Exception {
Transformation tranformation = new Transformation() {
@Override
public Class<? extends Reducer> getCombinerClass() {
return null;
}
@Override
public Class<?> getJarByClass() {
return MyTransformationJob .class;
}
@Override
public Class<? extends Mapper> getMapperClass() {
return MyMapper.class;
}
@Override
public Class<?> getOutputKeyClass() {
return Text.class;
}
@Override
public Class<?> getOutputValueClass() {
return NullWritable.class;
}
@Override
public Class<? extends Reducer> getReducerClass() {
if(StringUtils.equals(jobParams[3], "header")){
return HeaderReducer.class;
}
return ValuesReducer.class;
}
@Override
public Class<?> getMapOutputKeyClass() {
return Text.class;
}
@Override
public Class<?> getMapOutputValueClass() {
return LinkedMapWritable.class;
}
@Override
public Class<? extends Partitioner> getPartitionerClass() {
return StationKeyPartitioner.class;
}
@Override
public Class<? extends WritableComparator> getSortComparatorClass() {
return StationKeySortComparator.class;
}
@Override
public Class<? extends WritableComparator> getGroupingComparator() {
return UniqueIdGroupingComparator.class;
}
@Override
public Class<? extends InputFormat<?,?>> getInputFormatClass() {
return KeyValueTextInputFormat.class;
}
@Override
public Class<? extends OutputFormat<?,?>> getOutputFormatClass() {
return null;
}
};
return setupJob(tranformation,conf);
}
}
This way you can specify multiple jobs with different configurations and classes.
Solution 2:
You can create a local configuration and specify the values that you have mentioned
Sample test class:
public class ConfigurationTest extends TestCase {
@Test
public void test() throws IOException {
Configuration conf = new Configuration();
conf.addResource("hadoop-local.xml");
assertThat(conf.get("mapred.reduce.tasks"), is("2"));
}
}