I am using pyspark in synapse to create a data frame and adding computed column using user defined function. If there is any issue in the function I need to log that information. However I am getting error TypeError Cannot pickle _thread.luck object error when logger.error is used in except block of the function.
part of my code look like
logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.test.test")
def log_test(arg):
try:
return 1/arg
except Exception as e:
logger.error("Divided by zero error occurred")
udf_log_test = udf(lambda col:log_test(col) if col != 0 else 0, IntegerType())
df = df.withColumn('TestColumn', udf_log_test(lit(0)))
Error Traceback
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 527, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.RLock' object
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
TypeError Traceback (most recent call last)
File /databricks/spark/python/pyspark/serializers.py:527, in CloudPickleSerializer.dumps(self, obj)
526 try:
--> 527 return cloudpickle.dumps(obj, pickle_protocol)
528 except pickle.PickleError:
File /databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File /databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
601 try:
--> 602 return Pickler.dump(self, obj)
603 except RuntimeError as e:
TypeError: cannot pickle '_thread.RLock' object
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
File <command-109665656057519>:1
----> 1 df = df.withColumn('TestColumn', udf_log_test(lit(0)))
File /databricks/spark/python/pyspark/sql/udf.py:322, in UserDefinedFunction._wrapped.<locals>.wrapper(*args)
320 @functools.wraps(self.func, assigned=assignments)
321 def wrapper(*args: "ColumnOrName") -> Column:
--> 322 return self(*args)
File /databricks/spark/python/pyspark/sql/udf.py:299, in UserDefinedFunction.__call__(self, *cols)
297 sc.profiler_collector.add_profiler(id, memory_profiler)
298 else:
--> 299 judf = self._judf
300 jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column))
301 return Column(jPythonUDF)
File /databricks/spark/python/pyspark/sql/udf.py:219, in UserDefinedFunction._judf(self)
212 @property
213 def _judf(self) -> JavaObject:
214 # It is possible that concurrent access, to newly created UDF,
215 # will initialize multiple UserDefinedPythonFunctions.
216 # This is unlikely, doesn't affect correctness,
217 # and should have a minimal performance impact.
218 if self._judf_placeholder is None:
--> 219 self._judf_placeholder = self._create_judf(self.func)
220 return self._judf_placeholder
File /databricks/spark/python/pyspark/sql/udf.py:228, in UserDefinedFunction._create_judf(self, func)
225 spark = SparkSession._getActiveSessionOrCreate()
226 sc = spark.sparkContext
--> 228 wrapped_func = _wrap_function(sc, func, self.returnType)
229 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
230 assert sc._jvm is not None
File /databricks/spark/python/pyspark/sql/udf.py:51, in _wrap_function(sc, func, returnType)
47 def _wrap_function(
48 sc: SparkContext, func: Callable[..., Any], returnType: "DataTypeOrString"
49 ) -> JavaObject:
50 command = (func, returnType)
---> 51 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
52 assert sc._jvm is not None
53 return sc._jvm.SimplePythonFunction(
54 bytearray(pickled_command),
55 env,
(...)
60 sc._javaAccumulator,
61 )
File /databricks/spark/python/pyspark/rdd.py:5248, in _prepare_for_python_RDD(sc, command)
5245 def _prepare_for_python_RDD(sc: "SparkContext", command: Any) -> Tuple[bytes, Any, Any, Any]:
5246 # the serialized command will be compressed by broadcast
5247 ser = CloudPickleSerializer()
-> 5248 pickled_command = ser.dumps(command)
5249 assert sc._jvm is not None
5250 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
5251 # The broadcast will have same life cycle as created PythonRDD
File /databricks/spark/python/pyspark/serializers.py:537, in CloudPickleSerializer.dumps(self, obj)
535 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
536 print_exec(sys.stderr)
--> 537 raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
Here the udf automatically tries to serialize the function and sends it to worker nodes, in that process it also tries
to serialize _thread.RLock
object.
As @DarkKnight and @Axel mention _thread.RLock
is not serializable.
So, the possible solution is using python logging
.
Try below code for logging.
import logging
logger = logging.getLogger("com.test.test")
def log_test(arg):
try:
return 1/arg
except Exception as e:
logger.error("Divided by zero error occurred")
udf_log_test = udf(lambda col:log_test(col) if col != 0 else 0, IntegerType())
df.withColumn('TestColumn', udf_log_test(lit(0))).show()
Output: