Search code examples
pythonapache-sparkpysparkdatabricksazure-synapse

TypeError Cannot pickle _thread.luck object error


I am using pyspark in synapse to create a data frame and adding computed column using user defined function. If there is any issue in the function I need to log that information. However I am getting error TypeError Cannot pickle _thread.luck object error when logger.error is used in except block of the function.

part of my code look like

logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.test.test")
   def log_test(arg):
 try:
    return 1/arg
 except Exception as e:
    logger.error("Divided by zero error occurred")

udf_log_test = udf(lambda col:log_test(col) if col != 0 else 0, IntegerType())
    
    df = df.withColumn('TestColumn', udf_log_test(lit(0)))

Error Traceback

Traceback (most recent call last):
      File "/databricks/spark/python/pyspark/serializers.py", line 527, in dumps
        return cloudpickle.dumps(obj, pickle_protocol)
      File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle '_thread.RLock' object
    PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

TypeError                                 Traceback (most recent call last)
File /databricks/spark/python/pyspark/serializers.py:527, in CloudPickleSerializer.dumps(self, obj)
    526 try:
--> 527     return cloudpickle.dumps(obj, pickle_protocol)
    528 except pickle.PickleError:

File /databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     70 cp = CloudPickler(
     71     file, protocol=protocol, buffer_callback=buffer_callback
     72 )
---> 73 cp.dump(obj)
     74 return file.getvalue()

File /databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
    601 try:
--> 602     return Pickler.dump(self, obj)
    603 except RuntimeError as e:

TypeError: cannot pickle '_thread.RLock' object

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
File <command-109665656057519>:1
----> 1 df = df.withColumn('TestColumn', udf_log_test(lit(0)))

File /databricks/spark/python/pyspark/sql/udf.py:322, in UserDefinedFunction._wrapped.<locals>.wrapper(*args)
    320 @functools.wraps(self.func, assigned=assignments)
    321 def wrapper(*args: "ColumnOrName") -> Column:
--> 322     return self(*args)

File /databricks/spark/python/pyspark/sql/udf.py:299, in UserDefinedFunction.__call__(self, *cols)
    297         sc.profiler_collector.add_profiler(id, memory_profiler)
    298 else:
--> 299     judf = self._judf
    300     jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column))
    301 return Column(jPythonUDF)

File /databricks/spark/python/pyspark/sql/udf.py:219, in UserDefinedFunction._judf(self)
    212 @property
    213 def _judf(self) -> JavaObject:
    214     # It is possible that concurrent access, to newly created UDF,
    215     # will initialize multiple UserDefinedPythonFunctions.
    216     # This is unlikely, doesn't affect correctness,
    217     # and should have a minimal performance impact.
    218     if self._judf_placeholder is None:
--> 219         self._judf_placeholder = self._create_judf(self.func)
    220     return self._judf_placeholder

File /databricks/spark/python/pyspark/sql/udf.py:228, in UserDefinedFunction._create_judf(self, func)
    225 spark = SparkSession._getActiveSessionOrCreate()
    226 sc = spark.sparkContext
--> 228 wrapped_func = _wrap_function(sc, func, self.returnType)
    229 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    230 assert sc._jvm is not None

File /databricks/spark/python/pyspark/sql/udf.py:51, in _wrap_function(sc, func, returnType)
     47 def _wrap_function(
     48     sc: SparkContext, func: Callable[..., Any], returnType: "DataTypeOrString"
     49 ) -> JavaObject:
     50     command = (func, returnType)
---> 51     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
     52     assert sc._jvm is not None
     53     return sc._jvm.SimplePythonFunction(
     54         bytearray(pickled_command),
     55         env,
   (...)
     60         sc._javaAccumulator,
     61     )

File /databricks/spark/python/pyspark/rdd.py:5248, in _prepare_for_python_RDD(sc, command)
   5245 def _prepare_for_python_RDD(sc: "SparkContext", command: Any) -> Tuple[bytes, Any, Any, Any]:
   5246     # the serialized command will be compressed by broadcast
   5247     ser = CloudPickleSerializer()
-> 5248     pickled_command = ser.dumps(command)
   5249     assert sc._jvm is not None
   5250     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
   5251         # The broadcast will have same life cycle as created PythonRDD

File /databricks/spark/python/pyspark/serializers.py:537, in CloudPickleSerializer.dumps(self, obj)
    535     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    536 print_exec(sys.stderr)
--> 537 raise pickle.PicklingError(msg)

PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

Solution

  • Here the udf automatically tries to serialize the function and sends it to worker nodes, in that process it also tries to serialize _thread.RLock object. As @DarkKnight and @Axel mention _thread.RLock is not serializable.

    enter image description here

    So, the possible solution is using python logging.

    enter image description here

    Try below code for logging.

    import logging
    logger = logging.getLogger("com.test.test")
    
    
    def log_test(arg):
        try:
            return 1/arg
        except Exception as e:
            logger.error("Divided by zero error occurred")
    
    udf_log_test = udf(lambda col:log_test(col) if col != 0 else 0, IntegerType())
        
    df.withColumn('TestColumn', udf_log_test(lit(0))).show()
    

    Output:

    enter image description here