pythonapache-sparkpysparkapache-spark-sql

PySpark : foreachPartition with additional parameters


I'm trying to execute my function using spark_df.foreachPartition(), and I want to pass additional parameter but apparently the function supports only one parameter (the partition).

I tried to play with it and do something like this :

def my_function(row, index_name) :
    return True 

def partition_func(row): 
   return my_function(row, "blabla")

spark_df.foreachPartition(partition_func)

However, I'm getting a serialization error :

_pickle.PicklingError: Could not serialize object: TypeError: Cannot serialize socket object

How can I make this work? I know I can add parameters to my Spark Dataframe, but I think it's an ugly solution, sending it in function parameter is so much better.


Solution

  • There might be other ways, but one simple approach could be to create a broadcast variable (or a container that holds any variables you may need), and then pass it to be used in your foreachPartition function. Something like this:

    def partition_func_with_var(partition, broadcast_var):
      for row in partition:
        print(str(broadcast_var.value) + row.desc)
    
    df = spark.createDataFrame([(1,"one"),(2,"two")],["id","desc"])
    
    bv = spark.sparkContext.broadcast(" some extra variable ")
    
    df.foreachPartition(lambda p: partition_func_with_var(p,bv))
    

    Note that "passing a variable" has a little murky meaning here, as it is actually a broadcast operation, with all its consequences and limitations (read-only, sent once, etc.)