Search code examples
apache-sparkjoinpysparkapache-spark-sql

Why broadcast join collect data to driver in order to shuffle data?


I understand the concept of the broadcast optimization.
When one of the sides in the join have small data it's better to do the shuffle just for the small side.
but why isn't it possible to do this shuffle using only the executors? Why do we need to use the driver?

If each executor hold hash table to map the records between the executors I think it should work.

In the current implementation of spark broadcast - it collect the data to the driver and then shuffle it and the collect action to the driver is bottleneck that I would like to avoid.

Any ideas of how to achieve similar optimization without having the bottleneck of the driver memory?


Solution

  • You are correct, the current implementation requires the collection of the data to the driver before sending it across to the Executors.

    There is already a JIRA ticket SPARK-17556 addressing exactly what you are proposing:

    "Currently in Spark SQL, in order to perform a broadcast join, the driver must collect the result of an RDD and then broadcast it. This introduces some extra latency. It might be possible to broadcast directly from executors."

    I have copied the proposed solution from an attached document to make this answer self-describing:

    "To add a broadcast method to RDD to perform broadcast from executor, we need some support work as follow:

    1. Construct BroadCastId from driver, BroadCastManager will supply a method to do this.
    // Called from driver to create new broadcast id
    def newBroadcastId: Long = nextBroadcastId.getAndIncrement()
    
    1. BroadCastManager could be able to create a broadcast with specified id and a persist tag to infer this broadcast is an executor broadcast, and its data will be backup on the hdfs.

    2. In the TorrentBroadcast.writeBlocks write the block to hdfs, readBlocks read block from local, remote, hdfs by priority.

    3. When construct the Broadcast, we can control whether to upload broadcast data block

    4. BroadCastManager post a api to put broadcast data to block manager