Search code examples
pythonclassapache-sparkpysparkrdd

using python class methods on RDD


My question may sound somewhat similar to this and this but trying the solutions of these also didn't helped me out.
I have a class tokenizer defined as-

class Tokenizer:
    def __init__(self, preserve_case=False):
        self.preserve_case = preserve_case

    def tokenize(self, s):
        """
        Argument: s -- any string or unicode object
        Value: a tokenize list of strings; conatenating this list returns the original string if preserve_case=False
        """        
        # Try to ensure unicode:
        try:
            s = str(s)
        except UnicodeDecodeError:
            s = s.encode('string_escape')
            s = str(s)
        # Fix HTML character entitites:
        s = self.__html2unicode(s)
        # Tokenize:
        words = word_re.findall(s)
        # Possible alter the case, but avoid changing emoticons like :D into :d:
        if not self.preserve_case:            
            words = map((lambda x : x if emoticon_re.search(x) else x.lower()), words)
        return words
tok=Tokenizer(preserve_case=False)

I have a (key,value) RDD of (user_id, tweets). I want to use the the tweets of the RDD on the function tokenize of class tokenizer. What I did was-

rdd.foreach(lambda x:tok.tokenize(x[1])).take(5)  

and got the error-

'NoneType' object has no attribute 'take'

I also tried-

rdd1.map(lambda x:tok.tokenize(x[1])).take(5)  

and got the error-

Py4JJavaError Traceback (most recent call last) in () ----> 1 rdd1.map(lambda x:tok.tokenize(x1)).take(5)

~/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py in take(self, num) 1358 1359 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1360 res = self.context.runJob(self, takeUpToNumLeft, p) 1361 1362 items += res

~/anaconda3/lib/python3.6/site-packages/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 1067

SparkContext#runJob. 1068 mappedRDD = rdd.mapPartitions(partitionFunc)

-> 1069 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 1070 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
1071

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in call(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args:

~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 101, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main process() File "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 397, in dump_stream bytes = self.serializer.dumps(vs) File "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 576, in dumps return pickle.dumps(obj, protocol) AttributeError: Can't pickle local object 'Tokenizer.tokenize..'

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153) at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main process() File "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 397, in dump_stream bytes = self.serializer.dumps(vs) File "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 576, in dumps return pickle.dumps(obj, protocol) AttributeError: Can't pickle local object 'Tokenizer.tokenize..'

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153) at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

Any help would be grateful. Thanks in advance!


Solution

  • rdd.foreach(lambda x:tok.tokenize(x[1])).take(5)

    Here you are trying to access the results from rdd.foreach() which is null.

    rdd1.map(lambda x:tok.tokenize(x[1])).take(5)

    Here you are using a custom object with lambda this throws the next exception:

    AttributeError: Can't pickle local object 'Tokenizer.tokenize..'

    which practically it means that pyspark failes to serialize the Tokenizer.tokenize method. One possible solution is to call tok.tokenize(x[1]) from a function and then pass a reference to that function in map as shown below:

    def tokenize(x):
      return tok.tokenize(x[0])
    
    rdd1.map(tokenize).take(5)
    

    Also in your code you have one more issue. Class Tokenizer is trying to access the undeclared self.__html2unicode(s) method. This will cause the following error:

    AttributeError: 'Tokenizer' object has no attribute '_Tokenizer__html2unicode'
    

    Related topics

    PySpark: PicklingError: Could not serialize object: TypeError: can't pickle CompiledFFI objects

    https://github.com/yahoo/TensorFlowOnSpark/issues/198