Search code examples
unit-testingapache-sparkpysparkpytestdelta-lake

Error when running Pytest with Delta Lake Tables


I am working in the VDI of a company and they use their own artifactory for security reasons. Currently I am writing unit tests to perform tests for a function that deletes entries from a delta table. When I started, I received an error of unresolved dependencies, because my spark session was configured in a way that it would load jars from maven. I was able to solve this issue by loading these jars locally from /opt/spark/jars. Now my code looks like this:

class TestTransformation(unittest.TestCase):
    @classmethod
    def test_ksu_deletion(self):
        self.spark = SparkSession.builder\
                        .appName('SPARK_DELETION')\
                        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
                        .config("spark.jars", "/opt/spark/jars/delta-core_2.12-0.7.0.jar, /opt/spark/jars/hadoop-aws-3.2.0.jar")\
                        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
                        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
                        .getOrCreate()
        os.environ["KSU_DELETION_OBJECT"]="UNITTEST/"
        deltatable = DeltaTable.forPath(self.spark, "/projects/some/path/snappy.parquet")
        deltatable.delete(col("DATE") < get_current()

However, I am getting the error message:

E     py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
E     : java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V

Do you have any idea by what this is caused? I am assuming it has to do with the way I am configuring spark.sql.extions and/or the spark.sql.catalog, but to be honest, I am quite a newb in Spark. I would greatly appreciate any hint.

Thanks a lot in advance!

Edit: We are using Spark 3.0.2 (Scala 2.12.10). According to https://docs.delta.io/latest/releases.html, this should be compatible. Apart from the SparkSession, I trimmed down the subsequent code to

df.spark.read.parquet(Path/to/file.snappy.parquet)

and now I am getting the error message

java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class

As I said, I am quite new to (Py)Spark, so please dont hesitate to mention things you consider completely obvious.

Edit 2: I checked the Python path I am exporting in the Shell before running the code and I can see the following: enter image description here Could this cause any problem? I dont understand why I do not get this error when running the code within pipenv (with spark-submit)


Solution

  • It looks like that you're using incompatible version of the Delta lake library. 0.7.0 was for Spark 3.0, but you're using another version - either lower, or higher. Consult Delta releases page to find mapping between Delta version & required Spark versions.

    If you're using Spark 3.1 or 3.2, consider using delta-spark Python package that will install all necessary dependencies, so you just import DeltaTable class.

    Update: Yes, this happens because of the conflicting versions - you need to remove delta-spark and pyspark Python package, and install pyspark==3.0.2 explicitly.

    P.S. Also, look onto pytest-spark package that can simplify specification of configuration for all tests. You can find examples of it + Delta here.