Search code examples
javahadoopinput-split

Efficiency of NLineInputFormat's InputSplit calculations


I looked into getSplitsForFile() fn of NLineInputFormat. I found that a InputStream is created for the input file & then its iterated and splits are created every n lines. Is it efficient? Particularly when this read operation is happening on 1 node before launching a mapper task. What if 1 have 5gb of file. Basically it means file data is seeked twice, once during the split creation & once during read from the mapper tasks. If this is a bottleneck how does hadoop job overrides this?

 public static List<FileSplit> getSplitsForFile(FileStatus status,
          Configuration conf, int numLinesPerSplit) throws IOException {
        List<FileSplit> splits = new ArrayList<FileSplit> ();
        Path fileName = status.getPath();
        if (status.isDirectory()) {
          throw new IOException("Not a file: " + fileName);
        }
        FileSystem  fs = fileName.getFileSystem(conf);
        LineReader lr = null;
        try {
          FSDataInputStream in  = fs.open(fileName);
          lr = new LineReader(in, conf);
          Text line = new Text();
          int numLines = 0;
          long begin = 0;
          long length = 0;
          int num = -1;
<!-- my part of concern start -->
          while ((num = lr.readLine(line)) > 0) {
            numLines++;
            length += num;
            if (numLines == numLinesPerSplit) {
              splits.add(createFileSplit(fileName, begin, length));
              begin += length;
              length = 0;
              numLines = 0;
            }
          }
<!-- my part of concern end -->
          if (numLines != 0) {
            splits.add(createFileSplit(fileName, begin, length));
          }
        } finally {
          if (lr != null) {
            lr.close();
          }
        }
        return splits; 
      }

Editing to provide my usecase to clément-mathieu

My data sets are big input files 2gb approx each. Each line in the files represent a record that needs to be inserted into the database's table (in my case cassandra) I want to limit the bulk transactions to my database to every n-lines. I have succeeded to do this using nlineinputformat. My only concern is if there is a hidden performance bottleneck that might show up in production.


Solution

  • Basically it means file data is seeked twice, once during the split creation & once during read from the mapper tasks.

    Yes.

    The purpose of this InputFormat is to create a split for every N-lines. The only way to compute the split boundaries is to read this file and find the new line characters. This operation can be costly, but you cannot avoid it if this is what you need.

    If this is a bottleneck how does hadoop job overrides this?

    Not sure to understand the question.

    NLineInputFormat is not the default InputFormat and very few use cases require it. If you read the javadoc of the class you will see that this class mainly exists to feed the parameters to embarrassingly parallel jobs (= "small" input files).

    Most of the InputFormat do no need to read the file to compute the splits. They usually use hard rules like a split should be 128MB or one split for each HDFS block and the RecordReaders will take care of the real start/end-of-split offset.

    If the cost of NLineInputFormat.getSplitsForFile is an issue I would really review why I need to use this InputFormat. What you want to do is to limit the batch size of a business process in your mapper. With NLineInputFormat a mapper is created for every N lines, it means that a mapper will never do more than one bulk transaction. You don't seems to need this feature, you only want to limit the size of a bulk transaction but don't care if a mapper does several of them sequentially. So you are paying the cost of the code you spotted for nothing in return.

    I would use TextInputFormat and create the batch in the mapper. In pseudo code:

    setup() {
      buffer = new Buffer<String>(1_000_000);
    }
    
    map(LongWritable key, Text value) {
      buffer.append(value.toString())
      if (buffer.isFull()) {
        new Transaction(buffer).doIt()
        buffer.clear()
      }
    }
    
    cleanup() {
      new Transaction(buffer).doIt()
      buffer.clear()
    }
    

    By default a mapper is created per HDFS block. If you think this is too much or little, mapred.(max|min).split.size variables allow to increase or decrease the parallelism.

    Basically, while convenient NLineInputFormat is too fine grained for what you need. You can achieve almost the same thing using TextInputFormat and playing with *.split.size which does not involve reading the files to create the splits.