Search code examples
hadoopapache-sparkpartitioninghadoop-partitioning

How does SparkContext.textFile work under the covers?


I am trying to understand the textFile method deeply, but I think my lack of Hadoop knowledge is holding me back here. Let me lay out my understanding and maybe you can correct anything that is incorrect

When sc.textFile(path) is called, then defaultMinPartitions is used, which is really just math.min(taskScheduler.defaultParallelism, 2). Let's assume we are using the SparkDeploySchedulerBackend and this is

conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))

So, now let's say the default is 2, going back to the textFile, this is passed in to HadoopRDD. The true size is determined in getPartitions() using inputFormat.getSplits(jobConf, minPartitions). But, from what I can find, the partitions is merely a hint and is in fact mostly ignored, so you will probably get the total number of blocks.

OK, this fits with expectations, however what if the default is not used and you provide a partition size that is larger than the block size. If my research is right and the getSplits call simply ignores this parameter, then wouldn't the provided min end up being ignored and you would still just get the block size?

Cross posted with the spark mailing list


Solution

  • Short Version:

    Split size is determined by mapred.min.split.size or mapreduce.input.fileinputformat.split.minsize, if it's bigger than HDFS's blockSize, multiple blocks inside a same file would be combined into a single split.

    Detailed Version:

    I think you are right in understanding the procedure before inputFormat.getSplits.

    Inside inputFormat.getSplits, more specifically, inside FileInputFormat's getSplits, it is mapred.min.split.size or mapreduce.input.fileinputformat.split.minsize that would at last determine split size. (I'm not sure which would be effective in Spark, I prefer to believe the former one).

    Let's see the code: FileInputFormat from Hadoop 2.4.0

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    
    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
    
          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[] splitHosts = getSplitHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts));
            bytesRemaining -= splitSize;
          }
    
          if (bytesRemaining != 0) {
            String[] splitHosts = getSplitHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts));
          }
        } else {
          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    

    Inside the for loop, makeSplit() is used to generate each split, and splitSize is the effective Split Size. The computeSplitSize Function to generate splitSize:

    protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
      return Math.max(minSize, Math.min(goalSize, blockSize));
    }
    

    Therefore, if minSplitSize > blockSize, the output splits are actually a combination of several blocks in the same HDFS file, on the other hand, if minSplitSize < blockSize, each split corresponds to a HDFS's block.