Search code examples
pythonapache-sparkkubernetespysparkuser-defined-functions

PySpark custom UDF ModuleNotFoundError


Running into the following error when use custom UDF

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 603, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 449, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 251, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 71, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'jobs'

The import spark scripts looks something like this

from jobs.lib_a import a
from jobs.udf import udf_function #This is a UDF

The scripts itself is located in jobs/scripts/test_script.py, the entire jobs folder is zipped and then added to spark using pyFiles.

The weird thing is that the other import from jobs module works, only fail for udf.

I have tried approach in this post, creating a separate zip file called udf.zip, putting udf at top level and then add it to spark via pyFiles, but still run into ModuleNotFoundError when I try to import udf.

I have also tried sys.path.append(<the udf path>)

The only approach works is when I copy the udf_function into the spark script test_script.py. This wouldn't work in reality as the udf_function can be shared by other spark script.

The underlying system is: Python 3.8 Spark 3.2 Spark is running in kubernetes


Solution

  • I was able to make it work.

    Some more context is that we are leveraging spark on k8s operator, so we pass in the zip file via pyFiles https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

    This work the same as the following

    spark_session.sparkContext.addPyFile('/opt/spark/pyfiles/python.zip')
    

    if we set it up in spark script

    Initially, we pass in as

    pyFiles:
    - local:///opt/spark/pyfiles/spinner-python.zip
    

    But local:/// point to the working directory, we have to change it to the following with extra slash(/) to point to absolute.

    pyFiles:
    - local:////opt/spark/pyfiles/spinner-python.zip
    

    When Pyspark serializes a UDF, it sends a copy of the UDF code to all the worker nodes, we have to point PyFiles to absolute path instead of relative path