Search code examples
javamapreduceinputformatter

Implementation for CombineFileInputFormat Hadoop 0.20.205


Can someone please point out where I could find an implementation for CombineFileInputFormat (org. using Hadoop 0.20.205? this is to create large splits from very small log files (text in lines) using EMR.

It is surprising that Hadoop does not have a default implementation for this class made specifically for this purpose and googling it looks like I'm not the only one confused by this. I need to compile the class and bundle it in a jar for hadoop-streaming, with a limited knowledge of Java this is some challenge.

Edit: I already tried the yetitrails example, with the necessary imports but I get a compiler error for the next method.


Solution

  • Here is an implementation I have for you:

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.LineRecordReader;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
    import org.apache.hadoop.mapred.lib.CombineFileSplit;
    
    @SuppressWarnings("deprecation")
    public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
    
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {
    
            return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
        }
    
        public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
            private final LineRecordReader linerecord;
    
            public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
                FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
                linerecord = new LineRecordReader(conf, filesplit);
            }
    
            @Override
            public void close() throws IOException {
                linerecord.close();
    
            }
    
            @Override
            public LongWritable createKey() {
                // TODO Auto-generated method stub
                return linerecord.createKey();
            }
    
            @Override
            public Text createValue() {
                // TODO Auto-generated method stub
                return linerecord.createValue();
            }
    
            @Override
            public long getPos() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getPos();
            }
    
            @Override
            public float getProgress() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getProgress();
            }
    
            @Override
            public boolean next(LongWritable key, Text value) throws IOException {
    
                // TODO Auto-generated method stub
                return linerecord.next(key, value);
            }
    
        }
    }
    

    In your job first set the parameter mapred.max.split.size according to the size you would like the input files to be combined into. Do something like follows in your run():

    ...
                if (argument != null) {
                    conf.set("mapred.max.split.size", argument);
                } else {
                    conf.set("mapred.max.split.size", "134217728"); // 128 MB
                }
    ...
    
                conf.setInputFormat(CombinedInputFormat.class);
    ...