Search code examples
scalahadoopcascadingscalding

Create Scalding Source like TextLine that combines multiple files into single mappers


We have many small files that need combining. In Scalding you can use TextLine to read files as text lines. The problem is we get 1 mapper per file, but we want to combine multiple files so that they are processed by 1 mapper.

I understand we need to change the input format to an implementation of CombineFileInputFormat, and this may involve using cascadings CombinedHfs. We cannot work out how to do this, but it should be just a handful of lines of code to define our own Scalding source called, say, CombineTextLine.

Many thanks to anyone who can provide the code to do this.

As a side question, we have some data that is in s3, it would be great if the solution given works for s3 files - I guess it depends on whether CombineFileInputFormat or CombinedHfs works for s3.


Solution

  • You get the idea in your question, so here is what possibly is a solution for you.

    Create your own input format that extends the CombineFileInputFormat and uses your own custom RecordReader. I am showing you Java code, but you could easily convert it to scala if you want.

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.LineRecordReader;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
    import org.apache.hadoop.mapred.lib.CombineFileSplit;
    
    public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {
    
        public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
            private final RecordReader<LongWritable,Text> delegate;
    
            public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
                FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
                delegate = new LineRecordReader(conf, fileSplit);
            }
    
            @Override
            public boolean next(LongWritable key, Text value) throws IOException {
                return delegate.next(key, value);
            }
    
            @Override
            public LongWritable createKey() {
                return delegate.createKey();
            }
    
            @Override
            public Text createValue() {
                return delegate.createValue();
            }
    
            @Override
            public long getPos() throws IOException {
                return delegate.getPos();
            }
    
            @Override
            public void close() throws IOException {
                delegate.close();
            }
    
            @Override
            public float getProgress() throws IOException {
                return delegate.getProgress();
            }
        }
    
        @Override
        public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
            return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
        }
    
    }
    

    Then you need to extend the TextLine class and make it use your own input format you just defined (Scala code from now on).

    import cascading.scheme.hadoop.TextLine
    import cascading.flow.FlowProcess
    import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
    import cascading.tap.Tap
    import com.twitter.scalding.{FixedPathSource, TextLineScheme}
    import cascading.scheme.Scheme
    
    class CombineFileTextLine extends TextLine{
    
      override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {
        super.sourceConfInit(flowProcess, tap, conf)
        conf.setInputFormat(classOf[CombinedInputFormat[String, String]])
      }
    }
    

    Create a scheme for the for your combined input.

    trait CombineFileTextLineScheme extends TextLineScheme{
    
      override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
    }
    

    Finally, create your source class:

    case class CombineFileMultipleTextLine(p : String*) extends  FixedPathSource(p :_*) with CombineFileTextLineScheme
    

    If you want to use a single path instead of multiple ones, the change to your source class is trivial.

    I hope that helps.