Search code examples
pysparkpicklegoogle-cloud-dataproc

pickle.PicklingError: Cannot pickle files that are not opened for reading


I'm getting this error while running PySpark job on Dataproc. What could be the reason?

This is the stack trace of error.

  File "/usr/lib/python2.7/pickle.py", line 331, in save
  self.save_reduce(obj=obj, *rv)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
  line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
  f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
  self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
  save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
  f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
  line 582, in save_file
  pickle.PicklingError: Cannot pickle files that are not opened for reading

Solution

  • The issue was that I was using a dictionary in the Map function. The reason it was failing: worker nodes couldn't access the dictionary which I was passing in map function.

    Solution:

    I broadcasted the dictionary and then used it in function (Map)
    sc =  SparkContext()
    lookup_bc = sc.broadcast(lookup_dict)
    

    Then in function, I took value by using this:

    data = lookup_bc.value.get(key)