Search code examples
apache-sparkamazon-s3pyspark

How to read from S3 on PySpark on local


I'm trying to read CSVs stored in an S3 bucket. I have Apache Spark 3.5.1 installed with Homebrew. I have downloaded the Hadoop AWS connector and copied it to /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/jars

Then, using the following code, I try to read the CSVs from S3:

import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Base Spark Template").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df2 = spark.read.csv("s3://arapbi/polygon/tickers/", header=True)

This fails with

Py4JJavaError: An error occurred while calling o40.csv.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
    at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)

The first line of that error trace is the important one; Spark doesn't recognize S3.

My understanding, however, is that Spark should be able to recognize S3, based on the connector I downloaded and the jar file I copied to the Spark Jars folder when Spark is installed via Homebrew.

Am I mistaken in which Jar file to use, or how to configure this in general? I followed these same steps with the Google Storage connector and it worked correctly.

I've Googled and searched Stack Overflow to no avail. I'll update the question with an answer if I find it, but if anyone's managed to set up Brew-installed PySpark to connect to S3, please let the rest of us know how!

UPDATE 04-04-2024

Per stevel's answer, I modified df2 = spark.read.csv("s3://arapbi/polygon/tickers/", header=True) to df2 = spark.read.csv("s3a://arapbi/polygon/tickers/", header=True). This changed the source of the error in the Stacktrace (see below), but still didn't work.

Py4JJavaError: An error occurred while calling o68.csv.
: java.lang.NoClassDefFoundError: software/amazon/awssdk/core/exception/SdkException
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:467)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2625)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2590)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.core.exception.SdkException
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    ... 31 more

Solution

  • You have to first update the hadoop jars from v3.4.0 to v3.3.4 jars with Spark 3.5.1 as it is compiled with Hadoop v3.3.4 as described in source code.

    Secondly, update the s3 URI scheme to s3a URI scheme as Hadoop supports only s3a client. Try setting the below configuration in your code.

    spark = SparkSession.builder.appName("Base Spark Template").getOrCreate()
        
    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain") # Change it according to your auth mechanism
    
    df2 = spark.read.csv("s3a://arapbi/polygon/tickers/", header=True)