Search code examples
scalaapache-sparkspark-streamingspark-streaming-kafka

AbstractMethodError upon creation of new StreamingContext


I've been having problems trying to instantiante a new StreamingContext of Spark Streaming.

I'm trying to create a new StreamingContext, but an error of AbstractMethodError is being thrown. I've been debugging the stack trace and found out that when the third Spark ListenerBus is being created inside the StreamingListenerBus, the application stops and throws this error.

Below is the code I'm trying to execute

package platform.etl

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ClickGeneratorStreaming {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("ClickGeneratorStreaming").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(10)

  }
}

And here is the stack trace

Exception in thread "main" java.lang.AbstractMethodError
    at org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:35)
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.<init>(StreamingListenerBus.scala:30)
    at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:56)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:183)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84)
    at platform.etl.ClickGeneratorStreaming$.main(ClickGeneratorStreaming.scala:10)
    at platform.etl.ClickGeneratorStreaming.main(ClickGeneratorStreaming.scala)

My build.sbt

name := "spark"

version := "0.1"

scalaVersion := "2.11.0"

val sparkVersion = "2.3.0.2.6.5.0-292"
val sparkKafkaVersion = "2.3.0"
val argonautVersion = "6.2"

resolvers += "jitpack" at "https://jitpack.io"
resolvers += "horton" at "http://repo.hortonworks.com/content/repositories/releases"
resolvers += "horton2" at "http://repo.hortonworks.com/content/groups/public"


libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3.2.6.5.0-292" excludeAll ExclusionRule(organization = "javax.servlet")
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.7.4"
libraryDependencies += "com.softwaremill.sttp" %% "core" % "1.2.0-RC2"
libraryDependencies += "com.softwaremill.retry" %% "retry" % "0.3.0"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.0"
libraryDependencies += "io.argonaut" %% "argonaut" % argonautVersion
libraryDependencies += "io.argonaut" %% "argonaut-monocle" % argonautVersion
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.0"
libraryDependencies += "com.github.mrpowers" % "spark-fast-tests" % "v2.3.0_0.11.0" % "test"
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.5"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.3.0"
libraryDependencies += "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.2.2"
libraryDependencies += "com.redislabs" % "spark-redis" % "2.3.1-M2"
libraryDependencies +=  "org.scalaj" %% "scalaj-http" % "2.4.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion 
libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion


assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case "application.conf"            => MergeStrategy.concat
  case "reference.conf"              => MergeStrategy.concat
  case _ => MergeStrategy.first
}

My plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Solution

  • Found the problem. Looks like I forgot to add the spark-streaming dependency on my build.sbt, and for some reason, It found a way to use a dependency of dependency on my import, making it use a different version of spark-streaming which is not compatible with my spark version.

    To solve that I just added a single newline at my build.sbt

    libraryDependencies += "org.apache.spark" %% "spark-streaming" % sparkVersion
    

    And it works flawlessly now.