Search code examples
apache-sparkbroadcastskew

Can I set different autoBroadcastJoinThreshold value in sparkConf for different sql?


I have large DataFrames:A(200g), B(20m), C(15m), D(10m), E(12m), I want to join them together: A join B, C join D and E using spark sql in same SparkSession**. Just like:

absql:sql("select * from A a inner join B b on a.id=b.id").write.csv("/path/for/ab")
cdesql:sql("select * from C c inner join D d on c.id=d.id inner join E e on c.id=e.id").write.csv("/path/for/cde")

Problem:

When I use default spark.sql.autoBroadcastJoinThreshold=10m

  • absql will take long time, the reason is absql skew.
  • cdesql is normal

When I set spark.sql.autoBroadcastJoinThreshold=20m

  • C,D,E will be broadcasted and all of the tasks will be executed in same executor, it still take long time.
  • if set num-executors=200, it take a long time to broadcast
  • absql is normal

Solution

  • Instead of changing autoBroadcastJoinThreshold, you can mark the dataframes to be broadcasted. In this way, it's easy to decide which dataframes should be broadcasted or not.

    In Scala it can look like this:

    import org.apache.spark.sql.functions.broadcast
    val B2 = broadcast(B)
    B2.createOrReplaceTempView("B")
    

    Here dataframe B has been marked for broadcasting and then been registered as a table to be used with Spark SQL.


    Alternatively, this can be done directly with the dataframe API, the first join can be written as:

    A.join(broadcast(B), Seq("id"), "inner")