Running into the following error when use custom UDF
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 603, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 449, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 251, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 71, in read_command
command = serializer._read_with_length(file)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'jobs'
The import spark scripts looks something like this
from jobs.lib_a import a
from jobs.udf import udf_function #This is a UDF
The scripts itself is located in
jobs/scripts/test_script.py
, the entire jobs folder is zipped and then added to spark using pyFiles.
The weird thing is that the other import from jobs module works, only fail for udf.
I have tried approach in this post, creating a separate zip file called udf.zip, putting udf at top level and then add it to spark via pyFiles, but still run into ModuleNotFoundError when I try to import udf
.
I have also tried sys.path.append(<the udf path>)
The only approach works is when I copy the udf_function into the spark script test_script.py
. This wouldn't work in reality as the udf_function can be shared by other spark script.
The underlying system is: Python 3.8 Spark 3.2 Spark is running in kubernetes
I was able to make it work.
Some more context is that we are leveraging spark on k8s operator, so we pass in the zip file via pyFiles https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
This work the same as the following
spark_session.sparkContext.addPyFile('/opt/spark/pyfiles/python.zip')
if we set it up in spark script
Initially, we pass in as
pyFiles:
- local:///opt/spark/pyfiles/spinner-python.zip
But local:/// point to the working directory, we have to change it to the following with extra slash(/) to point to absolute.
pyFiles:
- local:////opt/spark/pyfiles/spinner-python.zip
When Pyspark serializes a UDF, it sends a copy of the UDF code to all the worker nodes, we have to point PyFiles to absolute path instead of relative path