Search code examples
scalaapache-sparkapache-kafkaspark-streaming

Spark Streaming + Kafka Integration 0.8.2.1


I have problems integrating spark with kafka. I using spark-streaming-kafka-0-8. I compile with SBT. This is my code:

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka._
object sparkKafka {

    def main(args: Array[String]) {

        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")

        val ssc = new StreamingContext(sparkConf, Seconds(2))

        val kafkaStream = KafkaUtils.createStream(ssc,
    "localhost:2181", "spark stream",  Map("customer" -> 2))

        kafkaStream.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

I received this error:

`[info] Running sparkKafka
[error] (run-main-0) java.lang.NoClassDefFoundError: scala/Product$class
[error] java.lang.NoClassDefFoundError: scala/Product$class
[error]         at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:723)
[error]         at org.apache.spark.SparkConf$.<init>(SparkConf.scala:571)
[error]         at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
[error]         at org.apache.spark.SparkConf.set(SparkConf.scala:92)
[error]         at org.apache.spark.SparkConf.set(SparkConf.scala:81)
[error]         at org.apache.spark.SparkConf.setAppName(SparkConf.scala:118)
[error]         at sparkKafka$.main(sparkKafka.scala:15)
[error]         at sparkKafka.main(sparkKafka.scala)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]         at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.ClassNotFoundException: scala.Product$class
[error]         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error]         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
[error]         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[error]         at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:723)
[error]         at org.apache.spark.SparkConf$.<init>(SparkConf.scala:571)
[error]         at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
[error]         at org.apache.spark.SparkConf.set(SparkConf.scala:92)
[error]         at org.apache.spark.SparkConf.set(SparkConf.scala:81)
[error]         at org.apache.spark.SparkConf.setAppName(SparkConf.scala:118)
[error]         at sparkKafka$.main(sparkKafka.scala:15)
[error]         at sparkKafka.main(sparkKafka.scala)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 [error]         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 [error]         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]         at java.lang.reflect.Method.invoke(Method.java:498)
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 6 s, completed Jan 14, 2019 2:19:15 PM.`

This is my build.sbt file:

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.2.0"

How can I interated spark streaming with Kafka? I have a problem even spark-streaming-kafka-0-10....

Thanks


Solution

  • This is a version issue with Scala or Spark. Make sure you first are using Scala 2.11

    If you are using Kafka 0.10, or higher (which if you've setup Kafka recently, and are only running it locally, then you likely would be), then you shouldn't be using kafka-0-8 package.

    Do not mix spark-streaming-kafka-0-8 with spark-streaming-kafka-0-10

    So, if you wanted to use 0-10, as answered previously, the package needs to be org.apache.spark.streaming.kafka010, not org.apache.spark.streaming.kafka

    Also, note that the 0-8 does use Zookeeper (localhost:2181, for example), and 0-10 does not.