Search code examples
scalaapache-sparkapache-spark-sqldelta-lake

Unable to write DF in delta format on hdfs


I am using scala to simply write a Data Frame in delta format in a hdfs but getting an error which I am unable to understand whats causing it, please help me with this

Below is the code using which I am writing a delta table in my local hdfs.

val columns=Array("id", "first", "last", "year")

val test_df =sc.parallelize(Seq(
              (1, "John", "Doe", 1986),
              (2, "Ive", "Fish", 1990),
              (4, "John", "Wayne", 1995)
            )).toDF(columns: _*);

test_df.write.format("delta").mode("overwrite").save("hdfs://localhost:9000/user/test/");

Dependencies being used are:

  1. Spark v3.4.0
  2. Hadoop v3.3.5
  3. Delta Package: io.delta:delta-core_2.12:1.2.1

Error Msg:

java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormatWriter$Empty2Null
    at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:197)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:78)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:156)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at EventStream$.update_cta(EventStream.scala:25)
    at EventStream$.$anonfun$update_profiles_on_s3$1(EventStream.scala:68)
    at EventStream$.$anonfun$update_profiles_on_s3$1$adapted(EventStream.scala:68)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:34)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:726)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:726)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormatWriter$Empty2Null
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 61 more

Spark Version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)

To reproduce this issue simply run


 ./spark-shell --packages io.delta:delta-core_2.12:1.2.1
    
 val columns=Array("id", "first", "last", "year")
    
 val test_df=sc.parallelize(Seq(
      (1, "John", "Doe", 1986),
      (2, "Ive", "Fish", 1990),
      (4, "John", "Wayne", 1995)
    )).toDF(columns: _*)

  test_df.write.format("delta").mode("append").save("hdfs://localhost:9000/user/test/") 

OR give any path if hdfs is not configured


Solution

  • I can't reproduce NoClassDefFoundError. Managed to reproduce after replacing hdfs://... with file:///.... The following code seems to run with the following build.sbt (the code throws java.net.ConnectException: Call From .../127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused for me because I didn't configure Hadoop service). Please prepare step-by-step reproduction.

    build.sbt

    ThisBuild / scalaVersion := "2.12.17"
    
    libraryDependencies ++= Seq(
      "org.apache.spark"  %% "spark-sql"             % "3.4.0",
      "io.delta"          %% "delta-core"            % "1.2.1",
      "org.apache.hadoop" %  "hadoop-client"         % "3.3.5",
      "org.apache.hadoop" %  "hadoop-client-api"     % "3.3.5",
      "org.apache.hadoop" %  "hadoop-client-runtime" % "3.3.5"
    )
    
    import org.apache.spark.sql.SparkSession
    
    object App {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder
          .master("local")
          .appName("Spark app")
          .getOrCreate()
    
        val sc = spark.sparkContext
        import spark.implicits._
    
        val columns = Array("id", "first", "last", "year")
    
        val test_df = sc.parallelize(Seq(
          (1, "John", "Doe", 1986),
          (2, "Ive", "Fish", 1990),
          (4, "John", "Wayne", 1995)
        )).toDF(columns: _*);
    
        test_df.write.format("delta")
          .mode("overwrite")
          .save("hdfs://localhost:9000/user/test/")
      }
    }
    

    The class org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null exists in spark-sql 3.3.2-

    https://github.com/apache/spark/blob/v3.3.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L59

    In 3.4.0 it's absent

    https://github.com/apache/spark/blob/v3.4.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

    You should look at your classpath in order to figure out which of dependency is still using Spark 3.3.2- although you think that you're using Spark 3.4.0.

    How do you build your project? With sbt?

    • You can do sbt dependencyTree (this doesn't show provided dependencies).

    • Or print System.getProperty("java.class.path") (this shows classpath only upon JVM start).

    • Or add scalacOptions += "-Ylog-classpath" to build.sbt

    • Or run the following script inside your actual Spark environment that you're using

    var cl = getClass.getClassLoader
    while (cl != null) {
      println(s"classloader: ${cl.getClass.getName}")
      cl match {
        case cl: URLClassLoader =>
          println("classloader urls:")
          cl.getURLs.foreach(println)
        case _ =>
          println("not URLClassLoader")
      }
      cl = cl.getParent
    }
    

    Run a scala code jar appear NoSuchMethodError:scala.Predef$.refArrayOps

    Why am I getting a NoClassDefFoundError in Java?

    What causes and what are the differences between NoClassDefFoundError and ClassNotFoundException?


    The thing seems to be that delta-core 1.2.1 was compiled with respect to spark-sql 3.2.0

    https://repo1.maven.org/maven2/io/delta/delta-core_2.13/1.2.1/delta-core_2.13-1.2.1.pom

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.2.0</version>
      <scope>provided</scope>
    </dependency>
    

    And even the newest delta-core 2.3.0 is compiled with respect to spark-sql 3.3.2

    https://repo1.maven.org/maven2/io/delta/delta-core_2.13/2.3.0/delta-core_2.13-2.3.0.pom

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.3.2</version>
      <scope>provided</scope>
    </dependency>
    

    But as I said, Empty2Null is absent in spark-sql 3.4.0. So it seems all existing versions of delta-core can be incompatible with Spark 3.4.0.

    If in build.sbt I upgrade delta-core to 2.3.0 then NoClassDefFoundError changes to

    com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.spark.ErrorInfo.messageFormat()Ljava/lang/String;
    

    Try to downgrade Spark to 3.3.2-

    Then your code works for me properly. I replaced "hdfs://..." with local path "file:///..." and added configuration

    val spark = SparkSession.builder
      .master("local")
      .appName("Spark app")
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") // !!!
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") // !!!
      .getOrCreate()
    

    as was requested in another exception. And now in my local path file:///... several files appeared.

    Actually this is written in Delta Lake docs

    https://docs.delta.io/latest/releases.html

    Compatibility with Apache Spark

    Delta Lake version Apache Spark version
    2.3.x 3.3.x
    2.2.x 3.3.x
    2.1.x 3.3.x
    2.0.x 3.2.x
    1.2.x 3.2.x
    1.1.x 3.2.x
    1.0.x 3.1.x
    0.7.x and 0.8.x 3.0.x
    Below 0.7.0 2.4.2 - 2.4.

    https://github.com/delta-io/delta/issues/1696 [Feature Request] Support Spark 3.4

    the plan is to release Delta 2.4 on Spark 3.4