I have a simple Spark application doing structured streaming.
Initially, my build.sbt looks like this:
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.454" % "provided"
)
I succeed sbt assembly
, but later when I run spark-submit ...
, I got error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
at org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError(QueryCompilationErrors.scala:1070)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:673)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:40)
at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Then I removed provided
for spark-sql-kafka-0-10
:
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.454" % "provided"
)
But this time when I run sbt assembly
, I got this error:
[error] 1 error(s) were encountered during the merge:
| => ingest-from-s3-to-kafka / assembly 1s
[error] java.lang.RuntimeException:
[error] Deduplicate found different file contents in the following:
[error] Jar name = spark-sql-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = spark-tags_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = spark-token-provider-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = unused-1.0.0.jar, jar org = org.spark-project.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] at sbtassembly.Assembly$.merge(Assembly.scala:624)
[error] at sbtassembly.Assembly$.$anonfun$assemble$36(Assembly.scala:330)
[error] at sbtassembly.Assembly$.timed$1(Assembly.scala:228)
[error] at sbtassembly.Assembly$.$anonfun$assemble$35(Assembly.scala:329)
[error] at sbtassembly.Assembly$.$anonfun$cachedAssembly$2(Assembly.scala:523)
[error] at sbt.util.Tracked$.$anonfun$lastOutput$1(Tracked.scala:73)
[error] at sbtassembly.Assembly$.cachedAssembly(Assembly.scala:527)
[error] at sbtassembly.Assembly$.assemble(Assembly.scala:414)
[error] at sbtassembly.Assembly$.$anonfun$assemblyTask$1(Assembly.scala:196)
[error] at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error] at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error] at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error] at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error] at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error] at sbt.Execute.work(Execute.scala:291)
[error] at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error] at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error] at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
[error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[error] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[error] at java.base/java.lang.Thread.run(Thread.java:833)
[error] (assembly)
[error] Deduplicate found different file contents in the following:
[error] Jar name = spark-sql-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = spark-tags_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = spark-token-provider-kafka-0-10_2.12-3.3.2.jar, jar org = org.apache.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Jar name = unused-1.0.0.jar, jar org = org.spark-project.spark, entry target = org/apache/spark/unused/UnusedStubClass.class
[error] Total time: 3 s, completed Apr 24, 2023, 11:13:44 PM
make: *** [sbt-clean-compile-assembly] Error 1
Usually I saw "Deduplicate" error when sbt assembly
multiple dependencies, but this time, there is only one spark-sql-kafka-0-10
(the rest of dependencies are "provided"). Is it because class in the sub dependencies got conflict?
Then I found --packages
in this doc.
And this time spark-submit --packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 ...
succeed.
My question is when should add dependency to libraryDependencies
without "provided", and when should use --packages
? Thanks!
Try the following for that specific error
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
assemblyMergeStrategy in assembly := {
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
when should add dependency to libraryDependencies without "provided", and when should use --packages?
Use --packages
if your Spark cluster has open access to the internet or an internal Maven mirror. This will download any necessary libraries at runtime, allowing you to keep your application JAR smaller, with the tradeoff that you'll need to remember to always add extra cli arguments to spark-submit.
Otherwise, adding non-provided (aka compile time) dependencies as libraries with the assembly plugin will bundle all in one JAR, but you'll need to define merge strategies for duplicate elements
spark-sql-kafka-0-10 is part of the "contrib" path in the Spark source code; thus not considered "provided" as a common runtime dependency. This is dependent on your Spark cluster, however, as you can copy the jar into each executor classpath, then marking as provided would work fine.