I'm trying to execute my function using spark_df.foreachPartition()
, and I want to pass additional parameter but apparently the function supports only one parameter (the partition).
I tried to play with it and do something like this :
def my_function(row, index_name) :
return True
def partition_func(row):
return my_function(row, "blabla")
spark_df.foreachPartition(partition_func)
However, I'm getting a serialization error :
_pickle.PicklingError: Could not serialize object: TypeError: Cannot serialize socket object
How can I make this work? I know I can add parameters to my Spark Dataframe, but I think it's an ugly solution, sending it in function parameter is so much better.
There might be other ways, but one simple approach could be to create a broadcast variable (or a container that holds any variables you may need), and then pass it to be used in your foreachPartition function. Something like this:
def partition_func_with_var(partition, broadcast_var):
for row in partition:
print(str(broadcast_var.value) + row.desc)
df = spark.createDataFrame([(1,"one"),(2,"two")],["id","desc"])
bv = spark.sparkContext.broadcast(" some extra variable ")
df.foreachPartition(lambda p: partition_func_with_var(p,bv))
Note that "passing a variable" has a little murky meaning here, as it is actually a broadcast operation, with all its consequences and limitations (read-only, sent once, etc.)