It seems I'm unable to write using the delta format from my spark job, but I'm not sure what I'm missing. I'm using spark 3.5.3 and deltalake 3.2.0.
My error:
Exception in thread "main" org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
My build.sbt:
name := "test"
version := "0.1"
scalaVersion := "2.12.18"
logLevel := Level.Warn
assembly / logLevel := Level.Warn
clean / logLevel := Level.Warn
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.3" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.3" % "provided"
libraryDependencies += "io.delta" %% "delta-spark" % "3.2.0"
assembly / test := {}
assemblyJarName := s"${name.value}-${version.value}.jar"
assemblyMergeStrategy in assembly := {
case PathList("META-INF", _*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
and my job:
val spark = SparkSession
.builder()
.appName("test")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog"
)
.getOrCreate()
val df = getData(spark)
val path = "/home/user/testtable"
df.write.format("delta").mode("overwrite").save(path)
spark.stop()
Any thoughts? I'm looking at the delta lake quickstart, but I'm not seeing anything I'm missing. I feel like there's something obvious though.
How do you run your code?
Just in case, if you prepare fat jar with sbt assembly
and then submit it via spark-submit
don't forget --packages io.delta:delta-spark_2.12:3.2.0
.
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
doesn't work. libraryDependencies += "io.delta" %% "delta-spark" % "3.2.0"
in build.sbt
is not enough either.
Try
cd /path_to_project/target/scala-2.12
/path_to_spark/spark-3.5.3-bin-hadoop3/bin/spark-submit --packages io.delta:delta-spark_2.12:3.2.0 test-0.1.jar