scalaapache-sparkapache-spark-sql

Spark Sql bucketing optimization seems not taking effect


I am using spark 3.x, and I have the following simple query to learn the spark SQL bucketing feature.

test("bucket join 1") {
    val spark = SparkSession.builder().master("local").enableHiveSupport().appName("test join 1").config("spark.sql.codegen.wholeStage", "false").getOrCreate()
    import spark.implicits._
    val data1 = (0 to 100).map {
      i => (i, ('A' + i % 6).asInstanceOf[Char].toString)
    }


    val t1 = "t_" + System.currentTimeMillis()
    data1.toDF("a", "b").write.bucketBy(2, "b").saveAsTable(t1)

    val data2 = (0 to 5).map {
      i => (('A' + i % 6).asInstanceOf[Char].toString, ('A' + i % 6).asInstanceOf[Char].toString)
    }

    val t2 = "t_" + System.currentTimeMillis()
    data2.toDF("a", "b").write.bucketBy(2, "b").saveAsTable(t2)


    val df = spark.sql(
      s"""

      select  t1.a ,t1.b,t2.a, t2.b from $t1 t1 join $t2 t2 on t1.b = t2.b

      """.stripMargin(' '))

    df.explain(true)

    df.show(truncate = false)

    spark.sql(s"describe extended $t1 ").show(truncate = false)
    spark.sql(s"describe extended $t2 ").show(truncate = false)

    
  }


When I run the above code, it prints out the following physical plan, it clearly involves shuffle, so that I think the bucketing is not working here.

And from the output of the describe command, I see that both tables do have defined the bucket spec:

Bucket Columns: b
Num Buckets: 2

Not sure what I have missed that I don't have the expected result (shuffle should be avoided)

BroadcastHashJoin [b#27], [b#29], Inner, BuildRight
:- Project [a#26, b#27]
:  +- Filter isnotnull(b#27)
:     +- FileScan parquet default.t_1700986792232[a#26,b#27] Batched: false, DataFilters: [isnotnull(b#27)], Format: Parquet, Location: InMemoryFileIndex[file:/D://spark-warehouse/t_17009867..., PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:int,b:string>, SelectedBucketsCount: 2 out of 2
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])), [id=#35]
   +- Project [a#28, b#29]
      +- Filter isnotnull(b#29)
         +- FileScan parquet default.t_1700986813106[a#28,b#29] Batched: false, DataFilters: [isnotnull(b#29)], Format: Parquet, Location: InMemoryFileIndex[file:/D:/spark-warehouse/t_17009868..., PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:string,b:string>, SelectedBucketsCount: 2 out of 2


Solution

  • Your numbers for DF's lead to Catalyst thinking broadcast hash join is the better approach. It's an algorithm.

    1. Trying with the following spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) , Bucketing is used.
    2. Trying with a 1M rows for both DF's, Bucketing is used.

    As you can see below.

    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- SortMergeJoin [b#573], [b#575], Inner
       :- Sort [b#573 ASC NULLS FIRST], false, 0
       :  +- Filter isnotnull(b#573)
       :     +- FileScan parquet spark_catalog.default.t_1701024653715[a#572,b#573] Batched: false, Bucketed: true, DataFilters: [isnotnull(b#573)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/user/hive/warehouse/t_1701024653715], PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:int,b:string>, SelectedBucketsCount: 2 out of 2
       +- Sort [b#575 ASC NULLS FIRST], false, 0
          +- Filter isnotnull(b#575)
             +- FileScan parquet spark_catalog.default.t_1701024668124[a#574,b#575] Batched: false, Bucketed: true, DataFilters: [isnotnull(b#575)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/user/hive/warehouse/t_1701024668124], PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:string,b:string>, SelectedBucketsCount: 2 out of 2
    

    With delta and ZOrder you may wish to consider delta format.