I have large DataFrames:A(200g), B(20m), C(15m), D(10m), E(12m), I want to join them together: A join B, C join D and E using spark sql in same SparkSession**. Just like:
absql:sql("select * from A a inner join B b on a.id=b.id").write.csv("/path/for/ab")
cdesql:sql("select * from C c inner join D d on c.id=d.id inner join E e on c.id=e.id").write.csv("/path/for/cde")
Problem:
When I use default spark.sql.autoBroadcastJoinThreshold=10m
When I set spark.sql.autoBroadcastJoinThreshold=20m
Instead of changing autoBroadcastJoinThreshold
, you can mark the dataframes to be broadcasted. In this way, it's easy to decide which dataframes should be broadcasted or not.
In Scala it can look like this:
import org.apache.spark.sql.functions.broadcast
val B2 = broadcast(B)
B2.createOrReplaceTempView("B")
Here dataframe B has been marked for broadcasting and then been registered as a table to be used with Spark SQL.
Alternatively, this can be done directly with the dataframe API, the first join can be written as:
A.join(broadcast(B), Seq("id"), "inner")