Search code examples
apache-sparkrdd

Why are all data end up in one partition after reduceByKey?


I have this simple spark program. I am wondering why all data end up in one partition.

val l = List((30002,30000), (50006,50000), (80006,80000), 
             (4,0), (60012,60000), (70006,70000), 
             (40006,40000), (30012,30000), (30000,30000),
             (60018,60000), (30020,30000), (20010,20000), 
             (20014,20000), (90008,90000), (14,0), (90012,90000),
             (50010,50000), (100008,100000), (80012,80000),
             (20000,20000), (30010,30000), (20012,20000), 
             (90016,90000), (18,0), (12,0), (70016,70000), 
             (20,0), (80020,80000), (100016,100000), (70014,70000),
             (60002,60000), (40000,40000), (60006,60000), 
             (80000,80000), (50008,50000), (60008,60000), 
             (10002,10000), (30014,30000), (70002,70000),
             (40010,40000), (100010,100000), (40002,40000),
             (20004,20000), 
             (10018,10000), (50018,50000), (70004,70000),
             (90004,90000), (100004,100000), (20016,20000))

val l_rdd = sc.parallelize(l, 2)

// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
   iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)

// reduce on the second element of the list.
// alternatively you can use aggregateByKey  
val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

// print the reduced results along with its partition index
l_reduced.mapPartitionsWithIndex((index, iter) => {
      iter.toList.map(x => (index, x._1, x._2.size)).iterator
}).collect.foreach(println)

When you run this, you will see that data (l_rdd) is distributed into two partitions. Once I reduced, the resultant RDD (l_reduced) also has two partitions but all the data is in one partition (index 0) and the other one is empty. This happens even if the data is huge (a few GBs). Shouldn't the l_reduced be also distributed into two partitions.


Solution

  • val l_reduced = l_rdd.map(x => {
                        (x._2, List(x._1))
                      }).reduceByKey((a, b) => {b ::: a})
    

    With reference to the above snippet, you are partitioning by the second field of the RDD. All the numbers in the second field end with 0.

    When you call HashPartitioner, the partition number for a record is decided by the following function:

      def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
      }
    

    And the Utils.nonNegativeMod is defined as follows:

     def nonNegativeMod(x: Int, mod: Int): Int = {
        val rawMod = x % mod
        rawMod + (if (rawMod < 0) mod else 0)
      }
    

    Let us see what happens when we apply the above two pieces of logic to your input:

    scala> l.map(_._2.hashCode % 2) // numPartitions = 2
    res10: List[Int] = List(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
    

    Therefore, all of your records end up in partition 0.

    You can solve this problem by a repartition:

    val l_reduced = l_rdd.map(x => {
                        (x._2, List(x._1))
                      }).reduceByKey((a, b) => {b ::: a}).repartition(2)
    

    which gives:

    (0,100000,4)
    (0,10000,2)
    (0,0,5)
    (0,20000,6)
    (0,60000,5)
    (0,80000,4)
    (1,50000,4)
    (1,30000,6)
    (1,90000,4)
    (1,70000,5)
    (1,40000,4)
    

    Alternatively, you can create a custom partitioner.