Search code examples
apache-sparkpysparkdelta

Spark-delta not working when upgrade to spark 3.5.0 and delta 3.1.0


I have a docker project to work with spark locally, as follows:

  • Ubuntu 20.04 (WSL)
  • openjdk:17.0.2
  • Scala 2.12
  • Spark 3.4.0
  • Spark-delta 2.4.0
  • JupyterLab

Everything works fine, but when I wanted to upgrade the version of spark to 3.5.0 and spark-delta which works with 3.1.0, I got this error when I wanted to create or query a delta table. Code to create spark session :

spark_conf.setAll(
    [
        ("spark.master", "spark://spark-master:7077"),
        ("spark.app.name", "spark_app"]),
        ("spark.driver.memory", "4g"),
        ("spark.submit.deployMode", "client"),
        ("spark.ui.showConsoleProgress", "true"),
        ("spark.eventLog.enabled", "false"),
        ("spark.logConf", "false"),
        (
            "spark.jars",
            "/usr/lib/delta-core_2.12-3.1.0.jar",
        ),
        ("spark.driver.extraJavaOptions", "-Djava.net.useSystemProxies=true"),
        ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
        (
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        ),
        (
            "javax.jdo.option.ConnectionURL",
            f"jdbc:derby:;databaseName=/tmp/metastore_db;create=true",
        ),
        ("spark.sql.catalogImplementation", "hive"),
    ]
)
builder = SparkSession.builder.config(conf=spark_conf)
spark_session = configure_spark_with_delta_pip(builder).getOrCreate()

Code to select data from delta table:

df = spark_session.sql(f"""select * from delta_table;""")
df.show()

Error :

Py4JJavaError: An error occurred while calling o57.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) (172.20.0.4 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
    at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2227)
    at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2191)
    at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1478)
    at java.base/java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2690)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2497)
    at 
snipped ....
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at jdk.internal.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at  
snipped ..
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1744)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:514)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

Is this a compatibility issue?


Solution

  •  spark_conf.setAll(
        [(
                "spark.jars",
                "/usr/lib/delta-core_2.12-3.1.0.jar",
            ),
    

    as of delta 3.0 (search for "Delta Spark") the spark jars are different. You'll find the full list there.