I have a dataset which I want to write sorted into parquet files for getting benefit of requesting these files afterwards over Spark including Predicate Pushdown.
Currently I used repartition by column and the number of partitions to move the data to the particular partition. The column is identifying the corresponding partition (beginning from 0 to (fixed) n). The result is that scala/spark is generating an unexpected result and creating less partitions (some of them are empty). Maybe a Hash Collision?
For solving the problem I tried to find out the reason and tried to find workarounds. I found one workaround by transforming the dataframe to rdd and use partitionBy with HashPartitioner. Surprising for me: I got the expected results. But converting a dataframe to an RDD is not a solution for me, because it takes too much resources.
I have tested this environment on
SPARK 2.0 on cloudera CDH 5.9.3
SPARK 2.3.1 on emr-5.17.0
Here is my tests with outputs. Please use Spark-shell to run them
scala> import org.apache.spark.HashPartitioner
scala> val mydataindex = Array(0,1, 2, 3,4)
mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)
scala> val mydata = sc.parallelize(for {
| x <- mydataindex
| y <- Array(123,456,789)
| } yield (x, y), 100)
mydata: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:26
scala> val rddMyDataPartitions = rddMyData.mapPartitionsWithIndex{
| (index, iterator) => {
| val myList = iterator.toList
| myList.map(x => x + " -> " + index).iterator
| }
| }
rddMyDataPartitions: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26
scala> rddMyDataPartitions.take(100)
res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) -> 0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2, (2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4, (4,123) -> 4, (4,456) -> 4)
scala> val dfMyData = mydata.toDF()
dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
dfMyDataRepartitioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]
scala> dfMyDataRepartitioned.explain(false)
== Physical Plan ==
Exchange hashpartitioning(_1#3, 5)
+- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
+- Scan ExternalRDDScan[obj#2]
scala> val dfMyDataRepartitionedPartition = dfMyDataRepartitioned.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count()
dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame = [partition_id: int, count: bigint]
scala> dfMyDataRepartitionedPartition.show()
| 1| 6|
| 3| 3|
| 4| 3|
| 2| 3|
I first throught that HashPartitioner is been used in the method of repartitioning a dataframe, but this doesn't seem to be the case, becaue it is working on RDDs.
Could anyone guide me how this "Exchange hashpartitioning" (see explain output above) is working?
2019-01-16 12:20: This is not a duplicate of How does HashPartitioner work? because I am interested in the Hashing Algorithm of repartition by column (+ number partitions) on a Integer column. The general HashPartitioner is working as expected as you can see in the source code.
There is nothing unexpected here. As explained in How does HashPartitioner work? Spark uses hash(key) modulo number of partitions and non-uniform distribution, especially on small datasets is not unexepected.
Difference between Dataset
and RDD
is expected as well, as both use different hashing functions (ditto).
The result is that scala/spark is generating an unexpected result and creating less partitions
is not a correct observation. Number of partitions created is exactly the requested
scala> dfMyDataRepartitioned.rdd.getNumPartitions
res8: Int = 5
but the empty ones won't be visible in the aggregation, because there are no corresponding values.