Search code examples
apache-sparkrddpartitioning

Why spark creates empty partitions and how default partitioning work?


I am creating an RDD from a text file by specifying the number of partitions. But it gives me a different number of partitions than the specified one.

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 0) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[72] at textFile at <console>:27 
scala> people.getNumPartitions 
res47: Int = 1 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 1) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[50] at textFile at <console>:27 
scala> people.getNumPartitions 
res36: Int = 1 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 2) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[52] at textFile at <console>:27 
scala> people.getNumPartitions 
res37: Int = 2 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 3) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[54] at textFile at <console>:27 
scala> people.getNumPartitions 
res38: Int = 3 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 4) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:27 
scala> people.getNumPartitions 
res39: Int = 4 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 5) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[58] at textFile at <console>:27 
scala> people.getNumPartitions 
res40: Int = 6 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 6) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at textFile at <console>:27 
scala> people.getNumPartitions 
res41: Int = 7 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 7) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[62] at textFile at <console>:27 
scala> people.getNumPartitions 
res42: Int = 8 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 8) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[64] at textFile at <console>:27 
scala> people.getNumPartitions 
res43: Int = 9 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 9) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at textFile at <console>:27 
scala> people.getNumPartitions 
res44: Int = 11 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 10) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[68] at textFile at <console>:27 
scala> people.getNumPartitions 
res45: Int = 11 

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 11) 
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[70] at textFile at <console>:27 
scala> people.getNumPartitions 
res46: Int = 13

Contents of the file /home/pvikash/data/test.txt are:

This is a test file. 
Will be used for rdd partition.

I am trying to understand why the number of partitions is changing here and in case we have small data (which can fit into one partition) then why spark creates empty partitions?

Any explanation would be appreciated.


Solution

  • In spark the function textFile calls to the hadoopFile function.

    If you check the signature of hadoopFile looks like

    def hadoopFile[K, V](path: String,
                     inputFormatClass: Class[_ <: InputFormat[K, V]],
                     keyClass: Class[K],
                     valueClass: Class[V],
                     minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = {
    

    So the partition you specify is the minimum number of partition the RDD will have . However what will be the size of each partition will be is determined by different function computeSplitSize in file input format.

    So when you set the parallelism you are guaranteed to get at least that many partition , however exact number might be bigger than that you have.

    There is a nice blog related to this.