Search code examples
apache-sparkpysparkapache-spark-sql

Relevance of spark broadcast variable in filter a data frame using a list/tuple


Let's say I have a use case where I need to subset a spark data frame using values from a python list. I don't understand how it will be different if I use a normal python list v/s broadcast variable to achieve this task.

Eg:

Assume, I have a data frame df with two columns

df = spark.createDataFrame([('1','A'),('2','B'),('3','C'),('4','D')],["num","alpha"])
df.show(5)

+---+-----+
|num|alpha|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  4|    D|
+---+-----+

I want to filter it based on a list of values ['A','B']. The list can be very small or very big. I can do this in two ways.

Approach 1:

Use python list to filter

from pyspark.sql.functions import col

list_filter = ['A','B']
df.filter(col("alpha").isin(list_filter)).show(5)

Approach 2:

Use spark broadcast variable to filter

from pyspark.sql.functions import col

broadcast_filter = sc.broadcast(['A','B'])
df.filter(col("alpha").isin(broadcast_filter.value)).show(5)

Result :

Spark catalyst optimizer will convert both of these queries into the same physical plan

== Physical Plan ==
*(1) Filter alpha#784 IN (A,B)
+- Scan ExistingRDD[num#783,alpha#784]

Infact, type(broadcast_filter.value) tells us that this is a normal python list.

Moreover, the catalyst optimizer is directly parsing the values of this list in the == Physical Plan == and creating the DAG. So, only the driver program is dealing with this list. Executors don't even need to use it since the values are already there in the query plan.

Questions :

  1. So does it really matter if it was a normal python list or broadcast variable since the values were directly used in the physical plan ? IMO, broadcasting will degrade the performance as compared to using the python list.
  2. Will there be any case where a broadcast variable will improve the query performance compared to a normal variable except in the case of custom UDFs ?

Solution

  • Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. [...] Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

    In Spark, a Task (aka command) is the smallest individual unit of execution that corresponds to a RDD partition.

    With RDDs, things are clear.

    rdd = sc.parallelize(range(10))
    a_big_list = [1, 5]
    result = rdd.map(lambda x : (x, x in a_big_list)).collect
    
    a_big_list_br = sc.boradcast(a_big_list)
    result = rdd.map(lambda x : (x, x in a_big_list.value)).collect
    

    Both approach work. In the first case, the list is shipped to every task (i.e. every partition). If the variable and the RDD are big, that might be time consuming though. In the second case, the variable is only shipped once to every machine (that compute many tasks), using optimized distribution algorithms so it should be much faster.

    Now, back to dataframes and your questions:

    1. Short answer, No it does not matter. Why is that? When you write isin(list), you actually put the entire list inside the SparkSQL query. Catalyst will thus include it in its physical plan (before executing anything), try to optimize that plan and then run it. Your executor are not going to use that variable directly. Instead the information will be come from the query itself. Therefore the broadcast is completely useless in that case. When you do what you do, it is the driver who reads the broadcast variable and feeds it to catalyst, exactly the same way it would happen with a normal variable. Note also that catalyst is not meant to contain data. Try isin(list) with a large list and a small dataframe (spark.range(10)). You will see that it takes forever for spark to even try executing your query. It is the time for catalyst to build an unnecessary large physical plan. If the list is large enough, catalyst may even crash.

    2. When you use the variable inside a udf, you are back to the aforementioned RDD situation. Try it (as below) with a large list, you will see that it works much better than when you pass it inside the physical plan. Also, broadcasting will have the same impact as for RDDs. AFAIK, udf are the only way to broadcast variables in SparkSQL. Any other SparkSQL function will put your data in the physical plan and you don't want that.

    # no broadcast, list shipped to each task
    fun = udf(lambda x: x in a_big_list)
    spark.range(10).withColumn("x", fun(x)).show(false)
    
    # broadcast, list shipped only once to each machine
    fun_br = udf(lambda x: x in a_big_list.value)
    spark.range(10).withColumn("x", fun_br(x)).show(false)
    

    Finally, note that another approach would be to put your data in a dataframe in use join, with or without broadcast (pyspark.sql.functions.broadcast in the case of dataframes)