Search code examples
hadoophivehdfsclouderaimpala

Why partitioned join (shuffle) isn't always better than broadcast join?


I've done a deep research but i could not found nothing detailed enough.. I've read these: 1) http://www.cloudera.com/content/www/en-us/documentation/enterprise/latest/PDF/cloudera-impala.pdf 2) http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper28.pdf

but I didn't find any answer..

Can somebody explain me why partitioned join isn't always better? I mean if we have two table T1 (big one) and T2 (small one), if I use the partition strategy, both of them are partitioned and we have T1/n-1 subset sent to other nodes and the same is for T2. On the other hand if I choose the broadcast one Impala will send T2*n-1 of data to the others..

Maybe I didn't understand how the strategies work.. if I'm wrong can somebody explain me please? maybe with a simple draw? (I've already searched on google images..)

Thanks in advance


Solution

  • Partitioning isn't free, and both the build and probe (left and right) sides need to be partitioned to do a partition join. Each partitioning requires an exchange plan fragment as a child, and each will incur network transfer. However, if the build side is small then every node can have a copy of it (i.e. broadcast) and then probe the build-side hashtable with the unpartitioned left side without introducing an additional child exchange on the probe side. In fact, the exchanges that are needed for the broadcast are particularly expensive because each sender needs to send to N receivers.

    What is "small enough" to perform a broadcast join? It depends on a number of factors, but the most obvious and important is that the build-side hash table should fit in memory.

    Here's a sample plan where the join strategy is BROADCAST:

    [localhost:21000] > explain select * from alltypes t1 join alltypessmall t2 on t1.id = t2.id;
    Query: explain select * from alltypes t1 join alltypessmall t2 on t1.id = t2.id
    +-----------------------------------------------------------+
    | Explain String                                            |
    +-----------------------------------------------------------+
    | Estimated Per-Host Requirements: Memory=160.01MB VCores=2 |
    |                                                           |
    | 04:EXCHANGE [UNPARTITIONED]                               |
    | |                                                         |
    | 02:HASH JOIN [INNER JOIN, BROADCAST]                      |
    | |  hash predicates: t1.id = t2.id                         |
    | |                                                         |
    | |--03:EXCHANGE [BROADCAST]                                |
    | |  |                                                      |
    | |  01:SCAN HDFS [functional.alltypessmall t2]             |
    | |     partitions=4/4 files=4 size=6.32KB                  |
    | |                                                         |
    | 00:SCAN HDFS [functional.alltypes t1]                     |
    |    partitions=24/24 files=24 size=478.45KB                |
    +-----------------------------------------------------------+
    

    And here is a sample where the join strategy is PARTITIONED:

    Query: explain select * from tpch.lineitem t1 join tpch.lineitem t2 on t1.l_orderkey = t2.l_orderkey
    +-----------------------------------------------------------+
    | Explain String                                            |
    +-----------------------------------------------------------+
    | Estimated Per-Host Requirements: Memory=815.44MB VCores=2 |
    |                                                           |
    | 05:EXCHANGE [UNPARTITIONED]                               |
    | |                                                         |
    | 02:HASH JOIN [INNER JOIN, PARTITIONED]                    |
    | |  hash predicates: t1.l_orderkey = t2.l_orderkey         |
    | |                                                         |
    | |--04:EXCHANGE [HASH(t2.l_orderkey)]                      |
    | |  |                                                      |
    | |  01:SCAN HDFS [tpch.lineitem t2]                        |
    | |     partitions=1/1 files=1 size=718.94MB                |
    | |                                                         |
    | 03:EXCHANGE [HASH(t1.l_orderkey)]                         |
    | |                                                         |
    | 00:SCAN HDFS [tpch.lineitem t1]                           |
    |    partitions=1/1 files=1 size=718.94MB                   |
    +-----------------------------------------------------------+
    Fetched 16 row(s) in 0.03s
    

    Notice that the latter plan has an extra exchange. That means there is an additional plan fragment for the scan (id 00).