Search code examples
performanceapache-sparkrddpartitioning

In what scenarios hash partitioning is preferred over range partitioning in Spark?


I have gone through various articles about hash partitioning. But I still don't get it in what scenarios it is more advantageous than range partitioning. Using sortByKey followed by range partitioning allows data to be distributed evenly across cluster. But that may not be the case in hash partitioning. Consider the following example:

Consider a pair RDD with keys [8, 96, 240, 400, 401, 800] and the desired number of partition is 4.

In this case, hash partitioning distributes the keys as follows among the partitions:

partition 0: [8, 96, 240, 400, 800]
partition 1: [ 401 ]
partition 2: []
partition 3: [] 

(To compute partition : p = key.hashCode() % numPartitions )

The above partition leads to bad performance as the keys are not evenly distributed across all nodes. Since range partition can equally distribute the keys across the cluster, then in what scenarios hash partition proves to be a best fit over range partition?


Solution

  • While weakness of the hashCode is of some concern, especially when working with small integers, it usually can be addressed by adjusting number of partitions based on domain specific knowledge. It is also possible to replace default HashPartitioner with custom Partitioner using more appropriate hashing function. As long as there is no data skew, hash partitioning behaves well enough at scale on average.

    Data skews are completely different problem. If key distribution is significantly skewed, then distribution of the partitioned data, is likely to be skewed, no matter what Partitioner is used. Consider for example following RDD:

    sc.range(0, 1000).map(i => if(i < 9000) 1 else i).map((_, None))
    

    which simply cannot be uniformly partitioned.

    Why not use RangePartitioner by default?

    • It is less general than HashPartioner. While HashPartitioner requires only a proper implementation of ## and == for K, RangePartitioner requires an Ordering[K].
    • Unlike HashPartitioner, it has to approximate data distribution, therefore it requires additional data scan.
    • Because splits are computed based on a particular distribution, it might be unstable when reused across datasets. Consider following example:

      val rdd1 = sc.range(0, 1000).map((_, None))
      val rdd2 = sc.range(1000, 2000).map((_, None))
      
      val rangePartitioner = new RangePartitioner(11, rdd1)
      
      rdd1.partitionBy(rangePartitioner).glom.map(_.length).collect
      
      Array[Int] = Array(88, 91, 99, 91, 87, 92, 83, 93, 91, 86, 99)
      
      rdd2.partitionBy(rangePartitioner).glom.map(_.length).collect
      
      Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1000)
      

      As you can imagine this has serious implications for operations like joins. At the same time

      val hashPartitioner = new HashPartitioner(11)
      
      rdd1.partitionBy(hashPartitioner).glom.map(_.length).collect
      
      Array[Int] = Array(91, 91, 91, 91, 91, 91, 91, 91, 91, 91, 90)
      
      rdd2.partitionBy(hashPartitioner).glom.map(_.length).collect
      
      Array[Int] = Array(91, 91, 91, 91, 91, 91, 91, 91, 91, 90, 91)
      

    This brings us back to your questions:

    in what scenarios it is more advantageous than range partitioning.

    Hash partitioning is a default approach in many systems because it is relatively agnostic, usually behaves reasonably well, and doesn't require additional information about data distribution. These properties make it preferable, in lack of any a priori knowledge about the data.