I have a folder which has 14 files in it. I run the spark-submit with 10 executors on a cluster, which has resource manager as yarn.
I create my first RDD as this:
JavaPairRDD<String,String> files = sc.wholeTextFiles(folderPath.toString(), 10);
However, files.getNumPartitions()
gives me 7 or 8, randomly. Then I do not use coalesce/repartition anywhere and I finish my DAG with 7-8 partitions.
As I know, we gave argument as the "minimum" number of partitions, so that why Spark divide my RDD to 7-8 partitions?
I also run the same program with 20 partitions and it gave me 11 partitions.
I have seen a topic here, but it was about "more" partitions, which did not help me at all.
Note: In the program, I read another folder which has 10 files, and Spark creates 10 partitions successfully. I run the above problematic transformation after this successful job is finished.
File sizes: 1)25.07 KB 2)46.61 KB 3)126.34 KB 4)158.15 KB 5)169.21 KB 6)16.03 KB 7)67.41 KB 8)60.84 KB 9)70.83 KB 10)87.94 KB 11)99.29 KB 12)120.58 KB 13)170.43 KB 14)183.87 KB
Files are on the HDFS, block sizes are 128MB, replication factor 3.
It would have been more clear if we have size of each file. But code will not be wrong. I am adding this answer as per spark code base
First off all, maxSplitSize will be calculated depends directory size and min partitions passed in wholeTextFiles
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
}
// file: WholeTextFileInputFormat.scala
As per maxSplitSize
splits(partitions in Spark) will be extracted from source.
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray // Here number of splits will be decides
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
// file: WholeTextFileRDD.scala
More information available at CombineFileInputFormat#getSplits
class on reading files and preparing splits.
Note:
I referred Spark partitions as MapReduce splits here, as Spark borrowed input and output formatters from MapReduce