I am trying to understand the textFile
method deeply, but I think my
lack of Hadoop knowledge is holding me back here. Let me lay out my
understanding and maybe you can correct anything that is incorrect
When sc.textFile(path)
is called, then defaultMinPartitions
is used,
which is really just math.min(taskScheduler.defaultParallelism, 2)
. Let's
assume we are using the SparkDeploySchedulerBackend
and this is
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))
So, now let's say the default is 2, going back to the textFile
, this is
passed in to HadoopRDD
. The true size is determined in getPartitions()
using
inputFormat.getSplits(jobConf, minPartitions)
. But, from what I can find,
the partitions is merely a hint and is in fact mostly ignored, so you will
probably get the total number of blocks.
OK, this fits with expectations, however what if the default is not used and you provide a partition size that is larger than the block size. If my research is right and the getSplits call simply ignores this parameter, then wouldn't the provided min end up being ignored and you would still just get the block size?
Short Version:
Split size is determined by mapred.min.split.size
or mapreduce.input.fileinputformat.split.minsize
, if it's bigger than HDFS's blockSize, multiple blocks inside a same file would be combined into a single split.
Detailed Version:
I think you are right in understanding the procedure before inputFormat.getSplits
.
Inside inputFormat.getSplits
, more specifically, inside FileInputFormat's getSplits
, it is mapred.min.split.size
or mapreduce.input.fileinputformat.split.minsize
that would at last determine split size. (I'm not sure which would be effective in Spark, I prefer to believe the former one).
Let's see the code: FileInputFormat from Hadoop 2.4.0
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
Inside the for loop, makeSplit()
is used to generate each split, and splitSize
is the effective Split Size. The computeSplitSize Function to generate splitSize
:
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
Therefore, if minSplitSize > blockSize, the output splits are actually a combination of several blocks in the same HDFS file, on the other hand, if minSplitSize < blockSize, each split corresponds to a HDFS's block.