Search code examples
pythonapache-sparkpyspark

Why df.rdd.getNumPartitions() return number of partitions different than F.spark_partition_id() reports?


Here's the code that simply just creates 2 data frames and join them. At the end you can see the 2 functions call reporting contradicted results on the number of partitions:

spark = SparkSession.builder.appName("test").getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")
spark.conf.set("spark.sql.adaptive.enabled","false")
 
#create a one column(column name: value) DF
#with 100k elements with 99% of the value 
#being 1 and 1% being 0
df_skew = spark.createDataFrame([0] * 100000,
  IntegerType()).withColumn("key",
  when(rand()<0.01,0).otherwise(1))

#create a DF with 100 elements with 2 columns
#(column name: value,size)
df_small = spark.createDataFrame([i for i in 
  range(100)],IntegerType())  \
  .withColumn("size", when(rand()<0.1, 
  "small").otherwise("large")).repartition(3)

#directly join 2 data frames, note the 
#partition number reported below
#(spark_partition_id is a build-in function 
#that'll will tell what partition each row is 
#in so we can count the number of all the 
#partitions by using a groupBy/count
df_joined=df_skew.join(df_small,"value",
  "inner").select(df_skew.value,df_skew.key,
  df_small.size)
print("partitionId count on df_joined:")
df_joined.withColumn("partitionId",
  spark_partition_id()).groupBy("partitionId")
  .count().show()
print("df_joined getNumPartitions() :" , 
  df_joined.rdd.getNumPartitions())

Output: (as you can see from above code, getNumPartitions says there are 200 partitions, yet the function spark_partition_id only reports one partition on the same data frame(partitionId:191), can anyone explain this. Thank you very much. Spark version is 3.0.

partitionId count on df_joined:
+-----------+------+
|partitionId| count|
+-----------+------+
|        191|100000|
+-----------+------+

df_joined getNumPartitions() : 200

Solution

  • Why number of partitions are different?

    This is because of the setting "spark.sql.adaptive.enabled". Since you have turned this option off spark will no longer be able to optimize the shuffle partitions during the join. By default spark creates 200 shuffle partitions which is also the default value of option "spark.sql.shuffle.partitions". So for example if you were to set the value of this option to 100 spark will create 100 partitions after join.

    The most important thing here is that although the number of partitions is 200 most of the partitions are empty. You can verify this by executing the following code

    >>> set(map(len, df_joined.rdd.glom().collect()))
    {0, 100000}
    

    As you can see from the above output except for one partition rest of the partitions are empty. Now coming back to the output of grouping by F.spark_partition_id and counting the rows as its evident from the above output that all the data is only in one partition hence the aggregation reports 100000 rows in partition ID 191.