I am working in the VDI of a company and they use their own artifactory for security reasons. Currently I am writing unit tests to perform tests for a function that deletes entries from a delta table. When I started, I received an error of unresolved dependencies, because my spark session was configured in a way that it would load jars from maven. I was able to solve this issue by loading these jars locally from /opt/spark/jars. Now my code looks like this:
class TestTransformation(unittest.TestCase):
@classmethod
def test_ksu_deletion(self):
self.spark = SparkSession.builder\
.appName('SPARK_DELETION')\
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
.config("spark.jars", "/opt/spark/jars/delta-core_2.12-0.7.0.jar, /opt/spark/jars/hadoop-aws-3.2.0.jar")\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.getOrCreate()
os.environ["KSU_DELETION_OBJECT"]="UNITTEST/"
deltatable = DeltaTable.forPath(self.spark, "/projects/some/path/snappy.parquet")
deltatable.delete(col("DATE") < get_current()
However, I am getting the error message:
E py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
E : java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V
Do you have any idea by what this is caused? I am assuming it has to do with the way I am configuring spark.sql.extions and/or the spark.sql.catalog, but to be honest, I am quite a newb in Spark. I would greatly appreciate any hint.
Thanks a lot in advance!
Edit: We are using Spark 3.0.2 (Scala 2.12.10). According to https://docs.delta.io/latest/releases.html, this should be compatible. Apart from the SparkSession, I trimmed down the subsequent code to
df.spark.read.parquet(Path/to/file.snappy.parquet)
and now I am getting the error message
java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
As I said, I am quite new to (Py)Spark, so please dont hesitate to mention things you consider completely obvious.
Edit 2: I checked the Python path I am exporting in the Shell before running the code and I can see the following: Could this cause any problem? I dont understand why I do not get this error when running the code within pipenv (with spark-submit)
It looks like that you're using incompatible version of the Delta lake library. 0.7.0 was for Spark 3.0, but you're using another version - either lower, or higher. Consult Delta releases page to find mapping between Delta version & required Spark versions.
If you're using Spark 3.1 or 3.2, consider using delta-spark Python package that will install all necessary dependencies, so you just import DeltaTable
class.
Update: Yes, this happens because of the conflicting versions - you need to remove delta-spark
and pyspark
Python package, and install pyspark==3.0.2
explicitly.
P.S. Also, look onto pytest-spark package that can simplify specification of configuration for all tests. You can find examples of it + Delta here.