Search code examples
pythonapache-sparkpysparkapache-kafkaspark-streaming

How to solve ERROR Executor - Exception in task 0.0 in stage 20.0 (TID 20)?


I know similar question has been answered briefly, but i could not add my personal additional doubt there due to lack of minimum reputation...hence i am asking it here

I want to process Twitter data using Apache Spark + Kafka. I created a pattern for this. But when I run it, I get the following error. I searched a lot of places about this error, but I couldn't get the solution I wanted, or it didn't work. The last time I ran Spark with a smaller memory space, thinking memory was insufficient, but I still get the same error. This is my code that I received this error:

from kafka import KafkaConsumer
from pyspark.streaming import StreamingContext
import json
import pandas as pd
from pyspark import SparkConf,SparkContext
from pyspark.streaming.kafka import KafkaUtils

#cd /opt/hadoop-3.2.0-7/hadoop/spark      $sudo ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0  /opt/twitterConsumer.py

conf = SparkConf()
conf.setAppName("BDA-Twitter-Spark-Kafka")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc,1)

KafkaStream = KafkaUtils.createStream(ssc, "localhost:2181",'tks',{"xmas":1})   # directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
KafkaStream.pprint()

print("HERE1")


ssc.start()
ssc.awaitTermination()

ans my error is:

    ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68)
    at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
    at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68)
    at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/12/29 09:57:49 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
19/12/29 09:57:49 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68)
    at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
    at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68)
    at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68)
    at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
    at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68)
    at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

HOW TO MATCH VERSIONs OF ALL THE REQUIRED TOOLS HERE ?


Solution

  • The error you see likely comes from version mismatches

    Hadoop and Spark require Java 8

    You're using "Kafka with Scala 2.12" (Maven: kafka_2.12), so your packages must also use Scala 2.12 (Maven: spark-xyz_2.12), and also must match your Spark version (2.3.1). Your command shows you've pulled the Kafka streaming packages for Scala 2.11 for Spark 2.3.0. Also note that the Spark Streaming packages are deprecated, and you should be using spark-sql-kafka instead, Structured Streaming.

    You can still do real time analysis without Spark & Hadoop