Search code examples
pythonsqlapache-sparkudf

Can pyspark.sql.functions.udf distribute a .py module to the worker nodes?


I use pyspark.sql.functions.udf to define a UDF that uses a class imported from a .py module written by me.

from czech_simple_stemmer import CzechSimpleStemmer #this is my class in my module
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
...some code here...

def clean_one_raw_doc(my_raw_doc):
    ... calls something from CzechSimpleStemmer ...

udf_clean_one_raw_doc = udf(clean_one_raw_doc, StringType())

When I call

df = spark.sql("SELECT * FROM mytable").withColumn("output_text", udf_clean_one_raw_doc("input_text"))

I get a typical huge error message where probably this is the relevant part:

File "/data2/hadoop/yarn/local/usercache/ja063930/appcache/application_1472572954011_132777/container_e23_1472572954011_132777_01_000003/pyspark.zip/pyspark/serializers.py", line 431, in loads
return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'czech_simple_stemmer'

Do I understand it correctly that pyspark distributes udf_clean_one_raw_doc to all the worker nodes but czech_simple_stemmer.py is missing there in the nodes' python installations (being present only on the edge node where I run the spark driver)?

And if yes, is there any way how I could tell pyspark to distribute this module too? I guess I could probably copy manually czech_simple_stemmer.py to all the nodes' pythons but 1) I don't have the admin access to the nodes, and 2) even if I beg the admin to put it there and he does it, then in case I need to do some tuning to the module itself, he'd probably kill me.


Solution

  • SparkContext.addPyFile("my_module.py") will do it.