Search code examples
scalaapache-sparkspark-streamingspark-structured-streamingspark-streaming-kafka

spark-submit error Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource


In my spark program, I have this code:

val df = spark.readStream
        .format("kafka")
        .option("subscribe", "raw_weather")
        .option("kafka.bootstrap.servers", "<url:port>s of my brokers")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism" , "PLAIN")
        .option("kafka.sasl.jaas.config", 
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"" + "password" + "\";")
        .option("kafka.ssl.protocol", "TLSv1.2")
        .option("kafka.ssl.enabled.protocols", "TLSv1.2")
        .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
        .load()

Here is my build.sbt file:

    name := "kafka-streaming"
    version := "1.0"

    scalaVersion := "2.11.12"

    // still want to be able to run in sbt
    // https://github.com/sbt/sbt-assembly#-provided-configuration
    run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))

    fork in run := true
    javaOptions in run ++= Seq(
        "-Dlog4j.debug=true",
        "-Dlog4j.configuration=log4j.properties")

    assemblyMergeStrategy in assembly := {
        case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
        case PathList("META-INF", _*) => MergeStrategy.discard
        case _ => MergeStrategy.first
    }

    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % "2.4.0" % "provided",
        "org.apache.spark" %% "spark-sql" % "2.4.0" % "provided", //If not then this Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/spark/sql/Dataset
        "org.apache.spark" %% "spark-streaming" % "2.4.0" % "provided",
        "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.0" % "provided",
        "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" % "provided"
    )

If I remove the provided from libraryDependencies, I'm able to successfully run the scala code inside IntelliJ IDEA.

Now I do sbt assembly and then when I try to run this same program inside spark-submit using the following command:

spark-submit --class com.ibm.kafkasparkintegration.executables.WeatherDataStream hdfs://<some address>:8020/user/clsadmin/consumer-example.jar \
             --packages org.apache.spark:spark-core:2.4.0,org.apache.spark:spark-sql:2.4.0,org.apache.spark:spark-sql-kafka-0-10:2.4.0,org.apache.spark:spark-streaming:2.4.0,org.apache.spark:spark-streaming-kafka-0-10:2.4.0

(PS. consumer-example.jar is the JAR I got after doing sbt assembly)

I get this error:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
    >> at com.ibm.kafkasparkintegration.executables.WeatherDataStream$.getRawDataFrame(WeatherDataStream.scala:73) 
    at com.ibm.kafkasparkintegration.executables.WeatherDataStream$.main(WeatherDataStream.scala:23)
    at com.ibm.kafkasparkintegration.executables.WeatherDataStream.main(WeatherDataStream.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
    ... 14 more

From the error log, WeatherDataStream.scala:73 refers to the load() written in the above code. When this code works in IntelliJ, I don't understand why it doesn't work in spark-submit?


Solution

  • It is due to this line here:

    "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" % "provided"
    

    "provided" means this dependency is, well.... "provided" by whatever machine(s) you are using to run the compiled jar. To me, it looks like kafka is not being provided by your machine(s), so try removing the "provided" and re-assemble.