Search code examples
apache-sparkhive

Apache Spark: broadcast join behaviour: filtering of joined tables and temp tables


I need to join 2 tables in spark. But instead of joining 2 tables completely, I first filter out a part of second table:

spark.sql("select * from a join b on a.key=b.key where b.value='xxx' ")

I want to use broadcast join in this case.

Spark has a parameter which defines max table size for broadcast join: spark.sql.autoBroadcastJoinThreshold:

Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run. http://spark.apache.org/docs/2.4.0/sql-performance-tuning.html

I have following questions about this setup:

  1. which table size spark will compare with autoBroadcastJoinThreshold's value: FULL size, or size AFTER applying where clause?
  2. I am assuming that spark will apply where clause BEFORE broadcasting, correct?
  3. the doc says I need to run Hive's Analyze Table command beforehand. How it will work in a case when I am using temp view as a table? As far as I understand I cannot run Analyze Table command against spark's temp view created via dataFrame.createorReplaceTempView("b"). Can I broadcast temp view contents?

Solution

  • I went ahead and did some small experiments to answer your 1st question.

    Question 1 :

    • created a dataframe a with 3 rows [key,df_a_column]
    • created a dataframe b with 10 rows [key,value]
    • ran: spark.sql("SELECT * FROM a JOIN b ON a.key = b.key").explain()
    == Physical Plan ==
    *(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildLeft, false
    :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#168]
    :  +- LocalTableScan [key#122, df_a_column#123]
    +- *(1) LocalTableScan [key#111, value#112]
    

    As expected the Smaller df a with 3 rows is broadcasted.

    • Ran : spark.sql("SELECT * FROM a JOIN b ON a.key = b.key where b.value=\"bat\"").explain()
    == Physical Plan ==
    *(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildRight, false
    :- *(1) LocalTableScan [key#122, df_a_column#123]
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#152]
       +- LocalTableScan [key#111, value#112]
    

    Here you can notice the dataframe b is Broadcasted ! meaning spark evaluates the size AFTER applying where for choosing which one to broadcast.

    Question 2 :

    Yes you are right. It's evident from the previous output it applies where first.

    Question 3 : No you cannot analyse but you can broadcast tempView table by hinting spark about it even in SQL. ref

    Example : spark.sql("SELECT /*+ BROADCAST(b) */ * FROM a JOIN b ON a.key = b.key")

    And if you see explain now :

    == Physical Plan ==
    *(1) BroadcastHashJoin [key#122], [key#111], Inner, BuildRight, false
    :- *(1) LocalTableScan [key#122, df_a_column#123]
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#184]
       +- LocalTableScan [key#111, value#112]
    

    Now if you see, dataframe b is broadcasted even though it has 10 rows. In question 1, without the hint , a was broadcasted .

    Note: Broadcast hint in SQL spark is available for 2.2


    Tips to understand the physical plan :

    • Figure out the dataframe from the LocalTableScan[ list of columns ]
    • The dataframe present under the sub tree/list of BroadcastExchange is being broadcasted.