I am using scala to simply write a Data Frame in delta format in a hdfs but getting an error which I am unable to understand whats causing it, please help me with this
Below is the code using which I am writing a delta table in my local hdfs.
val columns=Array("id", "first", "last", "year")
val test_df =sc.parallelize(Seq(
(1, "John", "Doe", 1986),
(2, "Ive", "Fish", 1990),
(4, "John", "Wayne", 1995)
)).toDF(columns: _*);
test_df.write.format("delta").mode("overwrite").save("hdfs://localhost:9000/user/test/");
Dependencies being used are:
Error Msg:
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormatWriter$Empty2Null
at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:197)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:210)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:78)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:156)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at EventStream$.update_cta(EventStream.scala:25)
at EventStream$.$anonfun$update_profiles_on_s3$1(EventStream.scala:68)
at EventStream$.$anonfun$update_profiles_on_s3$1$adapted(EventStream.scala:68)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:34)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormatWriter$Empty2Null
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 61 more
Spark Version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.4.0
/_/
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)
To reproduce this issue simply run
./spark-shell --packages io.delta:delta-core_2.12:1.2.1
val columns=Array("id", "first", "last", "year")
val test_df=sc.parallelize(Seq(
(1, "John", "Doe", 1986),
(2, "Ive", "Fish", 1990),
(4, "John", "Wayne", 1995)
)).toDF(columns: _*)
test_df.write.format("delta").mode("append").save("hdfs://localhost:9000/user/test/")
OR give any path if hdfs is not configured
I can't reproduce Managed to reproduce after replacing NoClassDefFoundError
.hdfs://...
with file:///...
. The following code seems to run with the following build.sbt
(the code throws java.net.ConnectException: Call From .../127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException; For more details see:
http://wiki.apache.org/hadoop/ConnectionRefused for me because I didn't configure Hadoop service). Please prepare step-by-step reproduction.
build.sbt
ThisBuild / scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "3.4.0",
"io.delta" %% "delta-core" % "1.2.1",
"org.apache.hadoop" % "hadoop-client" % "3.3.5",
"org.apache.hadoop" % "hadoop-client-api" % "3.3.5",
"org.apache.hadoop" % "hadoop-client-runtime" % "3.3.5"
)
import org.apache.spark.sql.SparkSession
object App {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("Spark app")
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val columns = Array("id", "first", "last", "year")
val test_df = sc.parallelize(Seq(
(1, "John", "Doe", 1986),
(2, "Ive", "Fish", 1990),
(4, "John", "Wayne", 1995)
)).toDF(columns: _*);
test_df.write.format("delta")
.mode("overwrite")
.save("hdfs://localhost:9000/user/test/")
}
}
The class org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
exists in spark-sql
3.3.2-
In 3.4.0 it's absent
You should look at your classpath in order to figure out which of dependency is still using Spark 3.3.2- although you think that you're using Spark 3.4.0.
How do you build your project? With sbt?
You can do sbt dependencyTree
(this doesn't show provided
dependencies).
Or print System.getProperty("java.class.path")
(this shows classpath only upon JVM start).
Or add scalacOptions += "-Ylog-classpath"
to build.sbt
Or run the following script inside your actual Spark environment that you're using
var cl = getClass.getClassLoader
while (cl != null) {
println(s"classloader: ${cl.getClass.getName}")
cl match {
case cl: URLClassLoader =>
println("classloader urls:")
cl.getURLs.foreach(println)
case _ =>
println("not URLClassLoader")
}
cl = cl.getParent
}
Run a scala code jar appear NoSuchMethodError:scala.Predef$.refArrayOps
Why am I getting a NoClassDefFoundError in Java?
What causes and what are the differences between NoClassDefFoundError and ClassNotFoundException?
The thing seems to be that delta-core
1.2.1 was compiled with respect to spark-sql
3.2.0
https://repo1.maven.org/maven2/io/delta/delta-core_2.13/1.2.1/delta-core_2.13-1.2.1.pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
And even the newest delta-core
2.3.0 is compiled with respect to spark-sql
3.3.2
https://repo1.maven.org/maven2/io/delta/delta-core_2.13/2.3.0/delta-core_2.13-2.3.0.pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
But as I said, Empty2Null
is absent in spark-sql
3.4.0. So it seems all existing versions of delta-core
can be incompatible with Spark 3.4.0.
If in build.sbt
I upgrade delta-core
to 2.3.0 then NoClassDefFoundError
changes to
com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.spark.ErrorInfo.messageFormat()Ljava/lang/String;
Try to downgrade Spark to 3.3.2-
Then your code works for me properly. I replaced "hdfs://..."
with local path "file:///..."
and added configuration
val spark = SparkSession.builder
.master("local")
.appName("Spark app")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") // !!!
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") // !!!
.getOrCreate()
as was requested in another exception. And now in my local path file:///...
several files appeared.
Actually this is written in Delta Lake docs
https://docs.delta.io/latest/releases.html
Compatibility with Apache Spark
Delta Lake version | Apache Spark version |
---|---|
2.3.x | 3.3.x |
2.2.x | 3.3.x |
2.1.x | 3.3.x |
2.0.x | 3.2.x |
1.2.x | 3.2.x |
1.1.x | 3.2.x |
1.0.x | 3.1.x |
0.7.x and 0.8.x | 3.0.x |
Below 0.7.0 | 2.4.2 - 2.4. |
https://github.com/delta-io/delta/issues/1696 [Feature Request] Support Spark 3.4
the plan is to release Delta 2.4 on Spark 3.4