I am trying to implement function for text preprocessing in PySpark. I have amazon EMR where I am installing Python dependencies from the bootstrap script. One of these dependencies is textblob "python -m textblob.download_corpora". Then I am trying to use it locally on all the machines without any problem.
But when I am trying to run it from Spark then I am getting following error:
INFO: File "/home/hadoop/spark/python/pyspark/rdd.py", line 1324, in saveAsTextFile
INFO: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
INFO: File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
INFO: File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
INFO: py4j.protocol.Py4JJavaError: An error occurred while calling o54.saveAsTextFile.
INFO: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 1.0 failed 4 times, most recent failure: Lost task 8.3 in stage 1.0 (TID 40, ip-172-31-3-125.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
INFO: File "/home/hadoop/spark/python/pyspark/worker.py", line 79, in main
INFO: serializer.dump_stream(func(split_index, iterator), outfile)
INFO: File "/home/hadoop/spark/python/pyspark/serializers.py", line 127, in dump_stream
INFO: for obj in iterator:
INFO: File "/home/hadoop/spark/python/pyspark/rdd.py", line 1316, in func
INFO: for x in iterator:
INFO: File "/home/hadoop/pyckage/package_topics/package_topics/preprocessor.py", line 40, in make_tokens
INFO: File "./package_topics.zip/package_topics/data_utils.py", line 76, in preprocess_text
INFO: for noun_phrase in TextBlob(' '.join(tokens)).noun_phrases
INFO: File "/usr/lib/python2.6/site-packages/textblob/decorators.py", line 24, in __get__
INFO: value = obj.__dict__[self.func.__name__] = self.func(obj)
INFO: File "/usr/lib/python2.6/site-packages/textblob/blob.py", line 431, in noun_phrases
INFO: for phrase in self.np_extractor.extract(self.raw)
INFO: File "/usr/lib/python2.6/site-packages/textblob/en/np_extractors.py", line 138, in extract
INFO: self.train()
INFO: File "/usr/lib/python2.6/site-packages/textblob/decorators.py", line 38, in decorated
INFO: raise MissingCorpusError()
INFO: MissingCorpusError:
INFO: Looks like you are missing some required data for this feature.
INFO:
INFO: To download the necessary data, simply run
INFO:
INFO: python -m textblob.download_corpora
INFO:
INFO: or use the NLTK downloader to download the missing data: http://nltk.org/data.html
INFO: If this doesn't fix the problem, file an issue at https://github.com/sloria/TextBlob/issues.
INFO:
INFO:
INFO: org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
INFO: org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
INFO: org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
INFO: org.apache.spark.scheduler.Task.run(Task.scala:54)
INFO: org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
INFO: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
INFO: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
INFO: java.lang.Thread.run(Thread.java:745)
INFO: Driver stacktrace:
INFO: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
INFO: at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
INFO: at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
INFO: at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
INFO: at scala.Option.foreach(Option.scala:236)
INFO: at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
INFO: at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
INFO: at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
INFO: at akka.actor.ActorCell.invoke(ActorCell.scala:456)
INFO: at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
INFO: at akka.dispatch.Mailbox.run(Mailbox.scala:219)
INFO: at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
INFO: at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
INFO: at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
INFO: at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
INFO: at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I am trying to run both Spark and single node scripts under same user. Does anybody has some idea what can possibly be wrong?
So the problem was that Spark has internally set home to /home. Hack to make this work with Python is to add before call of textblob line:
os.environ['HOME'] = '/home/hadoop'
It is connected to this Spark issue https://issues.apache.org/jira/browse/SPARK-4099