I am creating an RDD from a text file by specifying the number of partitions. But it gives me a different number of partitions than the specified one.
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 0)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[72] at textFile at <console>:27
scala> people.getNumPartitions
res47: Int = 1
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 1)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[50] at textFile at <console>:27
scala> people.getNumPartitions
res36: Int = 1
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 2)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[52] at textFile at <console>:27
scala> people.getNumPartitions
res37: Int = 2
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 3)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[54] at textFile at <console>:27
scala> people.getNumPartitions
res38: Int = 3
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 4)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:27
scala> people.getNumPartitions
res39: Int = 4
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 5)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[58] at textFile at <console>:27
scala> people.getNumPartitions
res40: Int = 6
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 6)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at textFile at <console>:27
scala> people.getNumPartitions
res41: Int = 7
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 7)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[62] at textFile at <console>:27
scala> people.getNumPartitions
res42: Int = 8
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 8)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[64] at textFile at <console>:27
scala> people.getNumPartitions
res43: Int = 9
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 9)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at textFile at <console>:27
scala> people.getNumPartitions
res44: Int = 11
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 10)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[68] at textFile at <console>:27
scala> people.getNumPartitions
res45: Int = 11
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 11)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[70] at textFile at <console>:27
scala> people.getNumPartitions
res46: Int = 13
Contents of the file /home/pvikash/data/test.txt are:
This is a test file.
Will be used for rdd partition.
I am trying to understand why the number of partitions is changing here and in case we have small data (which can fit into one partition) then why spark creates empty partitions?
Any explanation would be appreciated.
In spark the function textFile calls to the hadoopFile function.
If you check the signature of hadoopFile looks like
def hadoopFile[K, V](path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = {
So the partition you specify is the minimum number of partition the RDD will have . However what will be the size of each partition will be is determined by different function computeSplitSize
in file input format.
So when you set the parallelism you are guaranteed to get at least that many partition , however exact number might be bigger than that you have.
There is a nice blog related to this.