Search code examples
pythonhadoopencodingpysparkrdd

Unpickling and encoding a string using rdd.map in PySpark


I need to port code from PySpark 1.3 to 2.3 (also on Python 2.7 only) and I have a following map transformation on the rdd:

import cPickle as pickle
import base64

path = "my_filename"

my_rdd = "rdd with data" # pyspark.rdd.PipelinedRDD()

# saving RDD to a file but first encoding everything
my_rdd.map(lambda line: base64.b64encode(pickle.dumps(line))).saveAsTextFile(path)

# another my_rdd.map doing the opposite of the above, fails with the same error
my_rdd = sc.textFile(path).map(lambda line: pickle.loads(base64.b64decode(line)))

When this part is run, I get the following error:

   raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

Looks like actions like this are not allowed anymore in the map function. Any suggestion how to potentially rewrite this part?

UPDATE:

weirdly enough, just doing:

my_rdd.saveAsTextFile(path)

also fails with the same error.


Solution

  • Bottom line, the problem was somewhere deep in the functions doing the transformations. Easier to rewrite than debug in this case.