Search code examples
python-2.7pysparkrddemramazon-emr

Amazon EMR Pyspark: rdd.distinct.count() failling


I am currently working with an EMR cluster connecting to RDS to gather 2 table.

The two RDD created are quite huge but I can perform .take(x) operations other them.

I can also perform more complex operations such as:

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda)
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1])))

But doing the following operation to count the number of distinct users imported from RDS do not work:

unique_users = rdd.distinct.count()

I have tried many configuration to see if it was a memory issues before (just in case but it does not solve the problem) ...

These are the errors I am getting now:

Traceback (most recent call last):
File "/home/hadoop/AppEngine/src/server.py", line 56, in <module>
run_server()
File "/home/hadoop/AppEngine/src/server.py", line 53, in run_server
AppServer().run()
File "/home/hadoop/AppEngine/src/server.py", line 45, in run
api = create_app(self.context, self.apps, self.devices)
File "/home/hadoop/AppEngine/src/api.py", line 190, in create_app
engine = AppEngine(spark_context, apps, devices)
File "/home/hadoop/AppEngine/src/engine.py", line 56, in __init__
self.unique_users = self.ratings.distinct().count()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco

File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in  stage 0.0 failed 4 times, most recent failure: Lost task 0.5 in stage 0.0 (TID 5, ip-172-31-3-140.eu-west-1.compute.internal, executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 164253 ms
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)`

Solution

  • The solution for the problem was the following:

    I did not have enough memory to perform the task. I changed the type of the core instance I was using in my cluster to instance with more memory available (m4.4xlarge here).

    Then I had to precise parameters to force the memory allocation of my instances for the spark-sumbmit:

    --driver-memory 2G
    --executor-memory 50G
    

    You can also add these parameters to avoid a long task from failling because of the heartbeat or the memory allocation:

    --conf spark.yarn.executor.memoryOverhead=XXX (large number such as 1024 or 4096)
    --conf spark.executor.heartbeatInterval=60s