Let's say I have a use case where I need to subset a spark data frame using values from a python list. I don't understand how it will be different if I use a normal python list v/s broadcast variable to achieve this task.
Eg:
Assume, I have a data frame df with two columns
df = spark.createDataFrame([('1','A'),('2','B'),('3','C'),('4','D')],["num","alpha"])
df.show(5)
+---+-----+
|num|alpha|
+---+-----+
| 1| A|
| 2| B|
| 3| C|
| 4| D|
+---+-----+
I want to filter it based on a list of values ['A','B']
. The list can be very small or very big.
I can do this in two ways.
Approach 1:
Use python list to filter
from pyspark.sql.functions import col
list_filter = ['A','B']
df.filter(col("alpha").isin(list_filter)).show(5)
Approach 2:
Use spark broadcast variable to filter
from pyspark.sql.functions import col
broadcast_filter = sc.broadcast(['A','B'])
df.filter(col("alpha").isin(broadcast_filter.value)).show(5)
Result :
Spark catalyst optimizer will convert both of these queries into the same physical plan
== Physical Plan ==
*(1) Filter alpha#784 IN (A,B)
+- Scan ExistingRDD[num#783,alpha#784]
Infact, type(broadcast_filter.value)
tells us that this is a normal python list.
Moreover, the catalyst optimizer is directly parsing the values of this list in the == Physical Plan ==
and creating the DAG. So, only the driver program is dealing with this list. Executors don't even need to use it since the values are already there in the query plan.
Questions :
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. [...] Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
In Spark, a Task (aka command) is the smallest individual unit of execution that corresponds to a RDD partition.
With RDDs, things are clear.
rdd = sc.parallelize(range(10))
a_big_list = [1, 5]
result = rdd.map(lambda x : (x, x in a_big_list)).collect
a_big_list_br = sc.boradcast(a_big_list)
result = rdd.map(lambda x : (x, x in a_big_list.value)).collect
Both approach work. In the first case, the list is shipped to every task (i.e. every partition). If the variable and the RDD are big, that might be time consuming though. In the second case, the variable is only shipped once to every machine (that compute many tasks), using optimized distribution algorithms so it should be much faster.
Now, back to dataframes and your questions:
Short answer, No it does not matter. Why is that? When you write isin(list)
, you actually put the entire list inside the SparkSQL query. Catalyst will thus include it in its physical plan (before executing anything), try to optimize that plan and then run it. Your executor are not going to use that variable directly. Instead the information will be come from the query itself. Therefore the broadcast is completely useless in that case. When you do what you do, it is the driver who reads the broadcast variable and feeds it to catalyst, exactly the same way it would happen with a normal variable. Note also that catalyst is not meant to contain data. Try isin(list)
with a large list and a small dataframe (spark.range(10)
). You will see that it takes forever for spark to even try executing your query. It is the time for catalyst to build an unnecessary large physical plan. If the list is large enough, catalyst may even crash.
When you use the variable inside a udf, you are back to the aforementioned RDD situation. Try it (as below) with a large list, you will see that it works much better than when you pass it inside the physical plan. Also, broadcasting will have the same impact as for RDDs. AFAIK, udf are the only way to broadcast variables in SparkSQL. Any other SparkSQL function will put your data in the physical plan and you don't want that.
# no broadcast, list shipped to each task
fun = udf(lambda x: x in a_big_list)
spark.range(10).withColumn("x", fun(x)).show(false)
# broadcast, list shipped only once to each machine
fun_br = udf(lambda x: x in a_big_list.value)
spark.range(10).withColumn("x", fun_br(x)).show(false)
Finally, note that another approach would be to put your data in a dataframe in use join
, with or without broadcast (pyspark.sql.functions.broadcast
in the case of dataframes)