Search code examples
hadoopconfigurationmapreducejobs

how to set configurations for Map-reduce jobs for Hadoop?


Let say I want to set these configurations for MR job:

mapred.map.tasks
mapred.reduce.tasks
mapred.tasktracker.map.tasks.maximum
mapred.tasktracker.reduce.tasks.maximum
mapred.reduce.slowstart.completed.maps

what are the possible ways I get set this?

I can set in mapred-site.xml. but that would be applicable for all jobs I run.

If I want to set these specifically for individual jobs, is this valid:

conf.set("mapred.tasktracker.map.tasks.maximum", 10)

(I have not see this kind anywhere)

or only through commandline argument

such as -D mapred.tasktracker.map.tasks.maximum=10

(This seems to be more common usage)


Solution

  • Solution 1: Create a BaseJob class:

    public abstract class BaseJob extends Configured implements Tool {
    
    // method to set the configuration for the job and the mapper and the reducer classes
    protected Job setupJob(Transformation transformation, final Configuration conf) throws Exception {
    
        //Get the job object from the global configuration
        Job job = new Job(conf);
    
        //Set the transformation specific details
        if(transformation.getMapperClass() != null)
        job.setMapperClass(transformation.getMapperClass());
    
        if(transformation.getReducerClass() != null)
        job.setReducerClass(transformation.getReducerClass());
    
        if(transformation.getMapOutputKeyClass() != null)
        job.setMapOutputKeyClass(transformation.getMapOutputKeyClass());
    
        if(transformation.getMapOutputValueClass() != null)
        job.setMapOutputValueClass(transformation.getMapOutputValueClass());
    
        if(transformation.getPartitionerClass() != null)
        job.setPartitionerClass(transformation.getPartitionerClass());
    
        if(transformation.getSortComparatorClass() != null)
        job.setSortComparatorClass(transformation.getSortComparatorClass());
    
        if(transformation.getGroupingComparator() != null)
        job.setGroupingComparatorClass(transformation.getGroupingComparator());
    
        if(transformation.getInputFormatClass() != null)
        job.setInputFormatClass(transformation.getInputFormatClass());
    
        if(transformation.getOutputKeyClass() != null)
        job.setOutputKeyClass(transformation.getOutputKeyClass());
    
        if(transformation.getOutputValueClass() != null)
        job.setOutputValueClass(transformation.getOutputValueClass());
    
        if(transformation.getJarByClass() != null)
        job.setJarByClass(transformation.getJarByClass());
    
        return job;
    }
    
    protected abstract class Transformation {
        public abstract Class<?> getJarByClass();
        public abstract Class<? extends Mapper> getMapperClass();
        public abstract Class<? extends Reducer> getCombinerClass();
        public abstract Class<? extends Reducer> getReducerClass();
        public abstract Class<?> getOutputKeyClass();
        public abstract Class<?> getOutputValueClass();
        public abstract Class<?> getMapOutputKeyClass();
        public abstract Class<?> getMapOutputValueClass();
        public abstract Class<? extends Partitioner> getPartitionerClass();
        public abstract Class<? extends WritableComparator> getSortComparatorClass();
        public abstract Class<? extends WritableComparator> getGroupingComparator();
        public abstract Class<? extends InputFormat<?,?>> getInputFormatClass();
        public abstract Class<? extends OutputFormat<?,?>> getOutputFormatClass();
    }
    

    }

    Then write your MyTransformationJob class and set your configurations

     public class MyTransformationJob extends BaseJob {
    
               private Job getJobConf(final Configuration conf) throws Exception {
    
    
            Transformation tranformation = new Transformation() {
                @Override
                public Class<? extends Reducer> getCombinerClass() {
                    return null;
                }
    
                @Override
                public Class<?> getJarByClass() {
                    return MyTransformationJob .class;
                }
    
                @Override
                public Class<? extends Mapper> getMapperClass() {
                    return MyMapper.class;
                }
    
                @Override
                public Class<?> getOutputKeyClass() {
                    return Text.class;
                }
    
                @Override
                public Class<?> getOutputValueClass() {
                    return NullWritable.class;
                }
    
                @Override
                public Class<? extends Reducer> getReducerClass() {
    
                    if(StringUtils.equals(jobParams[3], "header")){
                        return HeaderReducer.class;
                    }
                    return ValuesReducer.class;
    
                }
    
                @Override
                public Class<?> getMapOutputKeyClass() {
                    return Text.class;
                }
    
                @Override
                public Class<?> getMapOutputValueClass() {
                    return LinkedMapWritable.class;
                }
    
                @Override
                public Class<? extends Partitioner> getPartitionerClass() {
                    return StationKeyPartitioner.class;
                }
    
                @Override
                public Class<? extends WritableComparator> getSortComparatorClass() {
                    return StationKeySortComparator.class;
                }
    
                @Override
                public Class<? extends WritableComparator> getGroupingComparator() {
                    return UniqueIdGroupingComparator.class;
                }
    
                @Override
                public Class<? extends InputFormat<?,?>> getInputFormatClass() {
                    return KeyValueTextInputFormat.class;
                }
    
                @Override
                public Class<? extends OutputFormat<?,?>> getOutputFormatClass() {
                    return null;
                }
    
            };
    
            return setupJob(tranformation,conf);
    
        }  
    
     }
    

    This way you can specify multiple jobs with different configurations and classes.

    Solution 2:

    You can create a local configuration and specify the values that you have mentioned

    Sample test class:

    public class ConfigurationTest extends TestCase {
    
      @Test
      public void test() throws IOException {
    
        Configuration conf = new Configuration();
        conf.addResource("hadoop-local.xml");
    
        assertThat(conf.get("mapred.reduce.tasks"), is("2"));
    }
    
    }