I have problems integrating spark with kafka. I using spark-streaming-kafka-0-8. I compile with SBT. This is my code:
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka._
object sparkKafka {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaStream = KafkaUtils.createStream(ssc,
"localhost:2181", "spark stream", Map("customer" -> 2))
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}
}
I received this error:
`[info] Running sparkKafka
[error] (run-main-0) java.lang.NoClassDefFoundError: scala/Product$class
[error] java.lang.NoClassDefFoundError: scala/Product$class
[error] at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:723)
[error] at org.apache.spark.SparkConf$.<init>(SparkConf.scala:571)
[error] at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:92)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:81)
[error] at org.apache.spark.SparkConf.setAppName(SparkConf.scala:118)
[error] at sparkKafka$.main(sparkKafka.scala:15)
[error] at sparkKafka.main(sparkKafka.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.ClassNotFoundException: scala.Product$class
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[error] at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:723)
[error] at org.apache.spark.SparkConf$.<init>(SparkConf.scala:571)
[error] at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:92)
[error] at org.apache.spark.SparkConf.set(SparkConf.scala:81)
[error] at org.apache.spark.SparkConf.setAppName(SparkConf.scala:118)
[error] at sparkKafka$.main(sparkKafka.scala:15)
[error] at sparkKafka.main(sparkKafka.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 6 s, completed Jan 14, 2019 2:19:15 PM.`
This is my build.sbt file:
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.2.0"
How can I interated spark streaming with Kafka? I have a problem even spark-streaming-kafka-0-10....
Thanks
This is a version issue with Scala or Spark. Make sure you first are using Scala 2.11
If you are using Kafka 0.10, or higher (which if you've setup Kafka recently, and are only running it locally, then you likely would be), then you shouldn't be using kafka-0-8
package.
Do not mix spark-streaming-kafka-0-8
with spark-streaming-kafka-0-10
So, if you wanted to use 0-10
, as answered previously, the package needs to be org.apache.spark.streaming.kafka010
, not org.apache.spark.streaming.kafka
Also, note that the 0-8
does use Zookeeper (localhost:2181
, for example), and 0-10
does not.