apache-sparkpysparkdelta-lakeminio

java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found


I am new to spark and delta-lake and trying to do one POC with pyspark and using minio as delta-lake's storage backend. However, I am getting error that

Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

I have added the jar in python code and assuming it'll download the required jar on runtime. I am not able to understand where I am doing wrong.

Can someone please help me out ?

Thanks

ENV

OS: Windows 11

Spark: Apache Spark 3.3.1

Java Version: openjdk version "11.0.13" 2021-10-19

Python Version: 3.9.13

Python packages: pyspark 3.2.3, delta-spark 2.0.2

CODE

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key", <my key>) \
    .config("spark.hadoop.fs.s3a.secret.key", <my secret>) \
    .config("spark.hadoop.fs.s3a.endpoint", <my endpoint>) \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

data = spark.range(0, 5)

data.write.format("delta").save("s3a://<my bucket>/delta-lake/demo")

df = spark.read.format("delta").load("tmp/delta-table")
df.show()

OUTPUT

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/C:/Spark/spark-3.2.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/C:/Spark/spark-3.2.3-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\shari\.ivy2\cache
The jars for the packages stored in: C:\Users\shari\.ivy2\jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-050502ed-ca85-4e4f-b7a3-ff69c12689d3;1.0
        confs: [default]
        found io.delta#delta-core_2.12;2.0.2 in central
        found io.delta#delta-storage;2.0.2 in central
        found org.antlr#antlr4-runtime;4.8 in central
        found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 173ms :: artifacts dl 0ms
        :: modules in use:
        io.delta#delta-core_2.12;2.0.2 from central in [default]
        io.delta#delta-storage;2.0.2 from central in [default]
        org.antlr#antlr4-runtime;4.8 from central in [default]
        org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-050502ed-ca85-4e4f-b7a3-ff69c12689d3
        confs: [default]
        0 artifacts copied, 4 already retrieved (0kB/0ms)
23/02/16 16:19:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "D:\DLT\quin\quin-experian-elt\el\delta_test.py", line 19, in <module>
    data.write.format("delta").save("s3a://quin-third-party-data-dev-1/delta-lake/demo")
  File "C:\Users\shari\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pyspark\sql\readwriter.py", line 740, in save
    self._jwrite.save(path)
  File "C:\Users\shari\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\shari\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pyspark\sql\utils.py", line 111, in deco
    return f(*a, **kw)
  File "C:\Users\shari\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o61.save.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2667)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:620)
        at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:530)
        at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:153)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:349)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2571)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2665)
        ... 51 more

SUCCESS: The process with PID 6172 (child process of PID 24096) has been terminated.
SUCCESS: The process with PID 24096 (child process of PID 27152) has been terminated.
SUCCESS: The process with PID 27152 (child process of PID 25700) has been terminated.

Solution

  • The problem is most probably caused by the use of the configure_spark_with_delta_pip function that overrides spark.jars.packages option that you used for specifying Hadoop AWS dependency.

    To fix it you need to specify that dependency as additional parameter to the same function:

    builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.hadoop.fs.s3a.access.key", <my key>) \
        .config("spark.hadoop.fs.s3a.secret.key", <my secret>) \
        .config("spark.hadoop.fs.s3a.endpoint", <my endpoint>) \
        .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
    my_packages = ["org.apache.hadoop:hadoop-aws:3.3.1"]
    spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
    

    P.S. Check version compatibility of your Spark with Delta - see this compatibility matrix to find a correct version.