Search code examples
pysparkamazon-athenaprestodelta-lakepython-poetry

PySpark Delta Table - generate symlink [java.lang.NoSuchMethodError]


I have the current situation:

  • Delta table located in S3
  • I want to query this table via Athena
  • spark version 3.1.1 and hadoop 3.2.0

To do this, I need to follow the docs: instructions and s3 setup

I am using a MacBook Pro and with Environment variables configured in my ~/.zshrc for my small little POC:

export PYSPARK_PYTHON=<poetry_python_path>/bin/python3
export PYSPARK_DRIVER=<poetry_python_path>/bin/python3
export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"
export SPARK_HOME=<poetry_python_path>/site-packages/pyspark
export PYARROW_IGNORE_TIMEZONE=1

I setup a small pyspark project, where I create my spark_session:

from pyspark.sql import SparkSession
import findspark
import boto3


def create_session() -> SparkSession:
    findspark.init()

    spark_session = SparkSession.builder.appName("delta_session") \
        .master("local[*]") \
        .getOrCreate()

    sparkContext = spark_session.sparkContext

    boto_default_session = boto3.setup_default_session()

    boto_session = boto3.Session(
        botocore_session=boto_default_session, profile_name="dev", region_name="eu-west-1"
    )
    credentials = boto_session.get_credentials()

    print(
        f"Hadoop version = {sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
    )

    hadoopConfiguration = sparkContext._jsc.hadoopConfiguration()
    hadoopConfiguration.set(
        "fs.s3a.aws.credentials.provider", 
        "com.amazonaws.auth.profile.ProfileCredentialsProvider"
    )
    hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    hadoopConfiguration.set("fs.s3a.awsAccessKeyId", credentials.access_key)
    hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", credentials.secret_key)
    hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

    return spark_session

I then run:

spark_session = create_session()

from delta.tables import *

delta_table = DeltaTable.forPath(spark_session, "s3a://<my-path-to-delta-table>")

# This works
df = delta_table.toDF()
print(df.show(10))

# This fails
delta_table.generate("symlink_format_manifest")
  1. I am able to retrieve the delta files and create a DataFrame, all looks good.

  2. I then try to call delta_table.generate and I get this error:

Traceback (most recent call last): File "/run.py", line 33, in delta_table.generate("symlink_format_manifest") File "/private/var/folders/c8/sj3rz_k14cs58nqwr3m9zsxc0000gq/T/spark-ba2ce53e-c9f8-49d4-98d5-21d9581b05f4/userFiles-b6d820f0-4e96-4e27-8808-a14b9e93928a/io.delta_delta-core_2.12-0.7.0.jar/delta/tables.py", line 74, in generate File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call File "<poetry_python_path>/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o34.generate. : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generatePartitionPathExpression$1(GenerateSymlinkManifest.scala:350) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression(GenerateSymlinkManifest.scala:349) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression$(GenerateSymlinkManifest.scala:345) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generatePartitionPathExpression(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.withRelativePartitionDir(GenerateSymlinkManifest.scala:338) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.writeManifestFiles(GenerateSymlinkManifest.scala:262) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generateFullManifest$1(GenerateSymlinkManifest.scala:180) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.withStatusCode(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$recordManifestGeneration$1(GenerateSymlinkManifest.scala:365) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordDeltaOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.recordManifestGeneration(GenerateSymlinkManifest.scala:364) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest(GenerateSymlinkManifest.scala:167) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest$(GenerateSymlinkManifest.scala:165) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generateFullManifest(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1$adapted(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand.run(DeltaGenerateCommand.scala:50) at io.delta.tables.execution.DeltaTableOperations.executeGenerate(DeltaTableOperations.scala:54) at io.delta.tables.execution.DeltaTableOperations.executeGenerate$(DeltaTableOperations.scala:48) at io.delta.tables.DeltaTable.executeGenerate(DeltaTable.scala:45) at io.delta.tables.DeltaTable.generate(DeltaTable.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

I call the application with:

    poetry run spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" run.py

What I tried:

  • I have tried running it without poetry and directly downloading spark and doing it that way
  • I tried with an older hadoop version since they seem to use that here
  • I found this thread but it did not help me
  • I have also tried io.delta:delta-core_2.12:0.8.0
  • I have verified that delta version 0.7.0 and 0.8.0 should support spark 3.1.1
  • I also tried adding pyarrow and setting it via: spark_session.conf.set("spark.sql.execution.arrow.enabled", "true")
  • I have also tried to add hadoop-common 3.2.0 --packages org.apache.hadoop:hadoop-common:3.2.0, but that did not help either
  • I also tried running it with spark 3.1.1 and hadoop 3.2.0 but I gave it --packages "io.delta:delta-core_2.12:0.7.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:2.7.7" instead, but that gave me the error:
py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.lang.NumberFormatException: For input string: "100M"

It looks to me like the org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z is not callable for some reason. And I can't find anything more to install?

My pyproject.toml

[tool.poetry]
name = "..."
version = "1.0.0"
description = "..."
authors = ["..."]

[tool.poetry.dependencies]
python = "3.7.8"
pre-commit = "^2.8.2"
pyspark = {version="3.1.1", optional=true, extras=["sql"]}
findspark = "^1.4.2"
boto3 = "*"
pyarrow = "3.0.0"

[tool.poetry.dev-dependencies]
pytest = "6.1.1"
ipdb = "0.13.3"
pytest-cov = "2.10.1"

Greatful for anyone potentially encountering the same issue.


UPDATE

Based on the comment from Alex, I resolved the issue by:

  • Spark version 3.0.2
  • Hadoop version 3.2.0
  • Delta 0.8.0
  • spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" ~/code/dataops-delta-infrastructure/run.py

enter image description here


Solution

  • You need to downgrade Spark to Spark 3.0.2 to use Delta 0.8.0 - unfortunately, Spark 3.1.1 made many changes in the internal things that are used Delta under the hood, and this breaks the binary compatibility. Most probably, your specific problem is caused by SPARK-32154 that made changes in the parameters of the ScalaUDF (this line)