In my spark program, I have this code:
val df = spark.readStream
.format("kafka")
.option("subscribe", "raw_weather")
.option("kafka.bootstrap.servers", "<url:port>s of my brokers")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism" , "PLAIN")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"" + "password" + "\";")
.option("kafka.ssl.protocol", "TLSv1.2")
.option("kafka.ssl.enabled.protocols", "TLSv1.2")
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
.load()
Here is my build.sbt
file:
name := "kafka-streaming"
version := "1.0"
scalaVersion := "2.11.12"
// still want to be able to run in sbt
// https://github.com/sbt/sbt-assembly#-provided-configuration
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
fork in run := true
javaOptions in run ++= Seq(
"-Dlog4j.debug=true",
"-Dlog4j.configuration=log4j.properties")
assemblyMergeStrategy in assembly := {
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
case PathList("META-INF", _*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.0" % "provided", //If not then this Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/spark/sql/Dataset
"org.apache.spark" %% "spark-streaming" % "2.4.0" % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.0" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" % "provided"
)
If I remove the provided
from libraryDependencies
, I'm able to successfully run the scala code inside IntelliJ IDEA.
Now I do sbt assembly
and then when I try to run this same program inside spark-submit
using the following command:
spark-submit --class com.ibm.kafkasparkintegration.executables.WeatherDataStream hdfs://<some address>:8020/user/clsadmin/consumer-example.jar \
--packages org.apache.spark:spark-core:2.4.0,org.apache.spark:spark-sql:2.4.0,org.apache.spark:spark-sql-kafka-0-10:2.4.0,org.apache.spark:spark-streaming:2.4.0,org.apache.spark:spark-streaming-kafka-0-10:2.4.0
(PS. consumer-example.jar
is the JAR I got after doing sbt assembly
)
I get this error:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
>> at com.ibm.kafkasparkintegration.executables.WeatherDataStream$.getRawDataFrame(WeatherDataStream.scala:73)
at com.ibm.kafkasparkintegration.executables.WeatherDataStream$.main(WeatherDataStream.scala:23)
at com.ibm.kafkasparkintegration.executables.WeatherDataStream.main(WeatherDataStream.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
... 14 more
From the error log, WeatherDataStream.scala:73
refers to the load()
written in the above code. When this code works in IntelliJ, I don't understand why it doesn't work in spark-submit
?
It is due to this line here:
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" % "provided"
"provided" means this dependency is, well.... "provided" by whatever machine(s) you are using to run the compiled jar. To me, it looks like kafka is not being provided by your machine(s), so try removing the "provided" and re-assemble.