Search code examples

Can we leverage Spark's CBO (Cost Based Optimizer) with native parquet or in-memory dataframe?

Say i want to join 3 tables A,B,C with inner join and C being very small.

#DUMMY EXAMPLE with IN-MEMORY table, but same issue if load table using"")
var A = (1 to 1000000).toSeq.toDF("A")
var B = (1 to 1000000).toSeq.toDF("B")
var C = (1 to 10).toSeq.toDF("C")

And i have no control of which order the join is brought to me :

CASE1 = A.join(B,expr("A=B"),"inner").join(C,expr("A=C"),"inner")
CASE2 = A.join(C,expr("A=C"),"inner").join(B,expr("A=B"),"inner")

Running both show CASE1 run 30-40% slower than CASE2.

So the question is: how to leverage Spark's CBO to automatically translate CASE1 as CASE2 for in-memory table or table loaded from Spark's parquet reader?

I have tried doing :

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.cbo.enabled", "true")

but this throws :

org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'a' not found in database 'default'

Any other way to activate CBO without having to save the table in Hive?


  1. Even with spark.conf.set("spark.sql.cbo.enabled", "true") there is no cost estimation shown in the SparkWebUI
  2. Showing CASE1.explain != CASE2.explain


== Physical Plan ==
*(5) SortMergeJoin [A#3], [C#13], Inner
:- *(3) SortMergeJoin [A#3], [B#8], Inner
:  :- *(1) Sort [A#3 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(A#3, 200)
:  :     +- LocalTableScan [A#3]
:  +- *(2) Sort [B#8 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(B#8, 200)
:        +- LocalTableScan [B#8]
+- *(4) Sort [C#13 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(C#13, 200)
      +- LocalTableScan [C#13]


== Physical Plan ==
*(5) SortMergeJoin [A#3], [B#8], Inner
:- *(3) SortMergeJoin [A#3], [C#13], Inner
:  :- *(1) Sort [A#3 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(A#3, 200)
:  :     +- LocalTableScan [A#3]
:  +- *(2) Sort [C#13 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(C#13, 200)
:        +- LocalTableScan [C#13]
+- *(4) Sort [B#8 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(B#8, 200)
      +- LocalTableScan [B#8]


  • No, short answer is that this is not possible.

    This provides an excellent overview of what is possible and the point on persisted data stores.