Search code examples
javamavenapache-sparkpysparkjar

java.lang.NoClassDefFoundError: org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics When running pyspark


Im trying to run simple code with pyspark that connects to s3 and athena. im getting the following error:

Traceback (most recent call last):
  File "/hood/bennys/git/sbr-thomas/thomas/spark_manager.py", line 103, in <module>
    for df in generator:
  File "/hood/bennys/git/sbr-thomas/thomas/spark_manager.py", line 96, in read_superset_to_df_generator
    raise e
  File "/hood/bennys/git/sbr-thomas/thomas/spark_manager.py", line 92, in read_superset_to_df_generator
    df = self.read_file_to_dataframe(bucket, prefix)
  File "/hood/bennys/git/sbr-thomas/thomas/spark_manager.py", line 51, in read_file_to_dataframe
    df = self.spark.read.parquet(file_path)
  File "/hood/bennys/miniconda3/envs/py310-all/lib/python3.10/site-packages/pyspark/sql/readwriter.py", line 544, in parquet
    return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
  File "/hood/bennys/miniconda3/envs/py310-all/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/hood/bennys/miniconda3/envs/py310-all/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/hood/bennys/miniconda3/envs/py310-all/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o53.parquet.
: java.lang.NoClassDefFoundError: org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics
        at java.base/java.lang.ClassLoader.defineClass1(Native Method)
        at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
        at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
        at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:800)
        at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:698)
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:621)
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:579)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:586)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.impl.prefetch.PrefetchingStatistics
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        ... 35 more

I think the important part is:

java.lang.NoClassDefFoundError: 
org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics

I understand those kind of errors come from problems in jars\jars version. But as im new with java jars and mvn i would really appreciate your help as im confused and stuck. Thanks :)

spark home: spark home

spark jars:

AthenaJDBC42_2.0.25.1001.jar
HikariCP-2.5.1.jar
JLargeArrays-1.5.jar
JTransforms-3.1.jar
RoaringBitmap-0.9.45.jar
ST4-4.0.4.jar
activation-1.1.1.jar
aircompressor-0.26.jar
algebra_2.12-2.0.1.jar
annotations-17.0.0.jar
antlr-runtime-3.5.2.jar
antlr4-runtime-4.9.3.jar
aopalliance-repackaged-2.6.1.jar
arpack-3.0.3.jar
arpack_combined_all-0.1.jar
arrow-format-12.0.1.jar
arrow-memory-core-12.0.1.jar
arrow-memory-netty-12.0.1.jar
arrow-vector-12.0.1.jar
audience-annotations-0.5.0.jar
auth-2.16.84.jar
avro-1.11.2.jar
avro-ipc-1.11.2.jar
avro-mapred-1.11.2.jar
aws-java-sdk-bundle-1.12.715.jar
blas-3.0.3.jar
bonecp-0.8.0.RELEASE.jar
breeze-macros_2.12-2.1.0.jar
breeze_2.12-2.1.0.jar
bundle-2.25.45.jar
cats-kernel_2.12-2.1.1.jar
chill-java-0.10.0.jar
chill_2.12-0.10.0.jar
commons-cli-1.5.0.jar
commons-codec-1.16.0.jar
commons-collections-3.2.2.jar
commons-collections4-4.4.jar
commons-compiler-3.1.9.jar
commons-compress-1.23.0.jar
commons-crypto-1.1.0.jar
commons-dbcp-1.4.jar
commons-io-2.13.0.jar
commons-lang-2.6.jar
commons-lang3-3.12.0.jar
commons-logging-1.1.3.jar
commons-math3-3.6.1.jar
commons-pool-1.5.4.jar
commons-text-1.10.0.jar
compress-lzf-1.1.2.jar
curator-client-2.13.0.jar
curator-framework-2.13.0.jar
curator-recipes-2.13.0.jar
datanucleus-api-jdo-4.2.4.jar
datanucleus-core-4.1.17.jar
datanucleus-rdbms-4.1.19.jar
datasketches-java-3.3.0.jar
datasketches-memory-2.1.0.jar
derby-10.14.2.0.jar
dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
flatbuffers-java-1.12.0.jar
gson-2.2.4.jar
guava-14.0.1.jar
hadoop-aws-3.4.0.jar
hadoop-client-api-3.3.4.jar
hadoop-client-runtime-3.3.4.jar
hadoop-shaded-guava-1.1.1.jar
hadoop-yarn-server-web-proxy-3.3.4.jar
hive-beeline-2.3.9.jar
hive-cli-2.3.9.jar
hive-common-2.3.9.jar
hive-exec-2.3.9-core.jar
hive-jdbc-2.3.9.jar
hive-llap-common-2.3.9.jar
hive-metastore-2.3.9.jar
hive-serde-2.3.9.jar
hive-service-rpc-3.1.3.jar
hive-shims-0.23-2.3.9.jar
hive-shims-2.3.9.jar
hive-shims-common-2.3.9.jar
hive-shims-scheduler-2.3.9.jar
hive-storage-api-2.8.1.jar
hk2-api-2.6.1.jar
hk2-locator-2.6.1.jar
hk2-utils-2.6.1.jar
http-client-spi-2.16.84.jar
httpclient-4.5.14.jar
httpcore-4.4.16.jar
istack-commons-runtime-3.0.8.jar
ivy-2.5.1.jar
jackson-annotations-2.15.2.jar
jackson-core-2.15.2.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.15.2.jar
jackson-dataformat-yaml-2.15.2.jar
jackson-datatype-jsr310-2.15.2.jar
jackson-mapper-asl-1.9.13.jar
jackson-module-scala_2.12-2.15.2.jar
jakarta.annotation-api-1.3.5.jar
jakarta.inject-2.6.1.jar
jakarta.servlet-api-4.0.3.jar
jakarta.validation-api-2.0.2.jar
jakarta.ws.rs-api-2.1.6.jar
jakarta.xml.bind-api-2.3.2.jar
janino-3.1.9.jar
jars
javassist-3.29.2-GA.jar
javax.jdo-3.2.0-m3.jar
javolution-5.5.1.jar
jaxb-runtime-2.3.2.jar
jcl-over-slf4j-2.0.7.jar
jdo-api-3.0.1.jar
jersey-client-2.40.jar
jersey-common-2.40.jar
jersey-container-servlet-2.40.jar
jersey-container-servlet-core-2.40.jar
jersey-hk2-2.40.jar
jersey-server-2.40.jar
jline-2.14.6.jar
joda-time-2.12.5.jar
jodd-core-3.5.2.jar
jpam-1.1.jar
json-1.8.jar
json4s-ast_2.12-3.7.0-M11.jar
json4s-core_2.12-3.7.0-M11.jar
json4s-jackson_2.12-3.7.0-M11.jar
json4s-scalap_2.12-3.7.0-M11.jar
jsr305-3.0.0.jar
jta-1.1.jar
jul-to-slf4j-2.0.7.jar
kryo-shaded-4.0.2.jar
kubernetes-client-6.7.2.jar
kubernetes-client-api-6.7.2.jar
kubernetes-httpclient-okhttp-6.7.2.jar
kubernetes-model-admissionregistration-6.7.2.jar
kubernetes-model-apiextensions-6.7.2.jar
kubernetes-model-apps-6.7.2.jar
kubernetes-model-autoscaling-6.7.2.jar
kubernetes-model-batch-6.7.2.jar
kubernetes-model-certificates-6.7.2.jar
kubernetes-model-common-6.7.2.jar
kubernetes-model-coordination-6.7.2.jar
kubernetes-model-core-6.7.2.jar
kubernetes-model-discovery-6.7.2.jar
kubernetes-model-events-6.7.2.jar
kubernetes-model-extensions-6.7.2.jar
kubernetes-model-flowcontrol-6.7.2.jar
kubernetes-model-gatewayapi-6.7.2.jar
kubernetes-model-metrics-6.7.2.jar
kubernetes-model-networking-6.7.2.jar
kubernetes-model-node-6.7.2.jar
kubernetes-model-policy-6.7.2.jar
kubernetes-model-rbac-6.7.2.jar
kubernetes-model-resource-6.7.2.jar
kubernetes-model-scheduling-6.7.2.jar
kubernetes-model-storageclass-6.7.2.jar
lapack-3.0.3.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.12.0.jar
log4j-1.2-api-2.20.0.jar
log4j-api-2.20.0.jar
log4j-core-2.20.0.jar
log4j-slf4j2-impl-2.20.0.jar
logging-interceptor-3.12.12.jar
lz4-java-1.8.0.jar
mesos-1.4.3-shaded-protobuf.jar
metrics-core-4.2.19.jar
metrics-graphite-4.2.19.jar
metrics-jmx-4.2.19.jar
metrics-json-4.2.19.jar
metrics-jvm-4.2.19.jar
minlog-1.3.0.jar
netty-all-4.1.96.Final.jar
netty-buffer-4.1.96.Final.jar
netty-codec-4.1.96.Final.jar
netty-codec-http-4.1.96.Final.jar
netty-codec-http2-4.1.96.Final.jar
netty-codec-socks-4.1.96.Final.jar
netty-common-4.1.96.Final.jar
netty-handler-4.1.96.Final.jar
netty-handler-proxy-4.1.96.Final.jar
netty-resolver-4.1.96.Final.jar
netty-transport-4.1.96.Final.jar
netty-transport-classes-epoll-4.1.96.Final.jar
netty-transport-classes-kqueue-4.1.96.Final.jar
netty-transport-native-epoll-4.1.96.Final-linux-aarch_64.jar
netty-transport-native-epoll-4.1.96.Final-linux-x86_64.jar
netty-transport-native-kqueue-4.1.96.Final-osx-aarch_64.jar
netty-transport-native-kqueue-4.1.96.Final-osx-x86_64.jar
netty-transport-native-unix-common-4.1.96.Final.jar
objenesis-3.3.jar
okhttp-3.12.12.jar
okio-1.15.0.jar
opencsv-2.3.jar
orc-core-1.9.2-shaded-protobuf.jar
orc-mapreduce-1.9.2-shaded-protobuf.jar
orc-shims-1.9.2.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.3.jar
paranamer-2.8.jar
parquet-column-1.13.1.jar
parquet-common-1.13.1.jar
parquet-encoding-1.13.1.jar
parquet-format-structures-1.13.1.jar
parquet-hadoop-1.13.1.jar
parquet-jackson-1.13.1.jar
pickle-1.3.jar
py4j-0.10.9.7.jar
regions-2.16.84.jar
rocksdbjni-8.3.2.jar
s3-2.16.84.jar
scala-collection-compat_2.12-2.7.0.jar
scala-compiler-2.12.18.jar
scala-library-2.12.18.jar
scala-parser-combinators_2.12-2.3.0.jar
scala-reflect-2.12.18.jar
scala-xml_2.12-2.1.0.jar
sdk-core-2.16.84.jar
shims-0.9.45.jar
slf4j-api-2.0.7.jar
snakeyaml-2.0.jar
snakeyaml-engine-2.6.jar
snappy-java-1.1.10.3.jar
spark-catalyst_2.12-3.5.1.jar
spark-common-utils_2.12-3.5.1.jar
spark-core_2.12-3.5.1.jar
spark-graphx_2.12-3.5.1.jar
spark-hive-thriftserver_2.12-3.5.1.jar
spark-hive_2.12-3.5.1.jar
spark-kubernetes_2.12-3.5.1.jar
spark-kvstore_2.12-3.5.1.jar
spark-launcher_2.12-3.5.1.jar
spark-mesos_2.12-3.5.1.jar
spark-mllib-local_2.12-3.5.1.jar
spark-mllib_2.12-3.5.1.jar
spark-network-common_2.12-3.5.1.jar
spark-network-shuffle_2.12-3.5.1.jar
spark-repl_2.12-3.5.1.jar
spark-sketch_2.12-3.5.1.jar
spark-sql-api_2.12-3.5.1.jar
spark-sql_2.12-3.5.1.jar
spark-streaming_2.12-3.5.1.jar
spark-tags_2.12-3.5.1.jar
spark-unsafe_2.12-3.5.1.jar
spark-yarn_2.12-3.5.1.jar
spire-macros_2.12-0.17.0.jar
spire-platform_2.12-0.17.0.jar
spire-util_2.12-0.17.0.jar
spire_2.12-0.17.0.jar
stax-api-1.0.1.jar
stream-2.9.6.jar
super-csv-2.2.0.jar
threeten-extra-1.7.1.jar
tink-1.9.0.jar
transaction-api-1.1.jar
univocity-parsers-2.9.1.jar
xbean-asm9-shaded-4.23.jar
xz-1.9.jar
zjsonpatch-0.3.0.jar
zookeeper-3.6.3.jar
zookeeper-jute-3.6.3.jar
zstd-jni-1.5.5-4.jar

This is the code:


    from pyspark.sql import SparkSession
    from thomas.athena_manager import AthenaManager
    from tqdm import tqdm
    
    class SparkManager:
        _instance = None
    
        def __new__(cls, access_key=None, secret_key=None):
            if cls._instance is None:
                cls._instance = super(SparkManager, cls).__new__(cls)
    
                cls.jdbc_url = f"jdbc:awsathena://AwsRegion=eu-west-1;S3OutputLocation=s3://athena1-test-ireland/;User={aws_access_key_id};Password={aws_secret_access_key}"
    
                # Initialize SparkSession with S3 support
                cls._instance.spark = SparkSession.builder \
                    .appName("SparkManager for Thomas") \
                    .config("spark.jars", "/opt/spark/jars/AthenaJDBC42_2.0.25.1001.jar") \
                    .config("spark.executor.memory", "450g") \
                    .config("spark.driver.memory", "50g") \
                    .config("spark.executor.cores", "5") \
                    .config("spark.executor.instances", "10") \
                    .config("spark.hadoop.fs.s3a.access.key", access_key) \
                    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
                    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
                    .config("spark.sql.catalogImplementation", "hive") \
                    .enableHiveSupport() \
                    .getOrCreate()
                print(f"Initialized Spark version: {cls._instance.spark.version}")
            return cls._instance
    
        def read_file_to_dataframe(self, bucket, key):
            """Read a file directly from an S3 bucket into a Spark DataFrame, inferring the file type."""
            file_path = f"s3a://{bucket}/{key}"
            if key.endswith('.csv'):
                df = self.spark.read.option("header", "true").csv(file_path)
            elif key.endswith('.xlsx'):
                # Spark does not natively support xlsx, would need a third-party package or convert to CSV/Parquet
                raise ValueError("Spark does not natively support xlsx format.")
            elif key.endswith('.json'):
                df = self.spark.read.json(file_path)
            elif key.endswith('.parquet'):
                df = self.spark.read.parquet(file_path)
            else:
                raise ValueError("Unsupported file format")
            
            # TODO: return spark dataframe(?)
            return df
        
        def read_superset_to_df_generator(self, superset_name):
            """
            Returns an RDD of DataFrames for each set in the superset.
            """
            # Assuming the SQL context has already been set up to connect to Athena
            query = f"SELECT bucket, prefix FROM raw_images.chunks WHERE set_name IN (SELECT sets FROM raw_images.supersets WHERE name = '{superset_name}')"
    
            df_q = self.spark.read \
                .format("jdbc") \
                .option("url", self.jdbc_url) \
                .option("dbtable", f"({query})") \
                .option("driver", "com.simba.athena.jdbc.Driver") \
                .load()
            
            rows = df_q.collect()  # Collect data to the driver (use cautiously with large datasets)
            # Iterate over each row in DataFrame, retrieve bucket and prefix, and yield DataFrames
            for row in tqdm(rows, desc="Processing rows"):
                bucket = row['bucket']
                prefix = row['prefix']
                print(f"Reading data from bucket {bucket} with prefix {prefix}")
                try:
                    # Use the existing method to read files
                    df = self.read_file_to_dataframe(bucket, prefix)
                    yield df
                except Exception as e:
                    print(f"Failed to read data from bucket {bucket} with prefix {prefix}: {e}")
                    raise e
    
    # Example of how to use the class
    if __name__ == "__main__":
        # Provide your AWS Access Key and Secret Key when creating the SparkManager instance
        spark_manager = SparkManager(aws_access_key_id, aws_secret_access_key)
        generator = spark_manager.read_superset_to_df_generator("100_256")
        for df in generator:
            df.show()


Solution

  • I had this exact same issue! To fix the error you need to get the matching:

    hadoop-aws-x.x.x.jar

    for

    hadoop-client-api-x.x.x.jar hadoop-client-runtime-x.x.x.jar

    So from your jars list I can see that currently in your jars directory you have 3.3.4 for both hadoop-client-api-x.x.x.jar and hadoop-client-runtime-x.x.x.jar and you've got 3.4.0 for hadoop-aws-x.x.x.jar so you need download:

    hadoop-aws-3.3.4.jar and remove the 3.4.0 version that's currently in your jars folder.

    Here's a link to the correct JAR: hadoop-aws