Attempting to write a PySpark DataFrame to DigitalOcean Spaces results in a "Forbidden (403)" error.. When using the provided PySpark function get_spark() and attempting to write a DataFrame to DigitalOcean Spaces using the function test_spaces(), a "Forbidden (403)" error is encountered. The error stack trace suggests an issue with access permissions. But using the python boto
client the I can access spaces with these key/secret. Currently using PySpark 3.5 and I'm using the configuration for the jar files from here.
def get_spark() -> SparkSession:
"""Provides a well configured spark session"""
return (
SparkSession.builder.master("local[*]")
.appName("test")
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.jars.packages",
"io.delta:delta-spark_2.12:3.0.0,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262",
)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.hadoop.fs.s3a.access.key", "KEY")
.config("spark.hadoop.fs.s3a.secret.key", "SECRET")
.config("spark.haddop.fs.s3a.endpoint", "https://REGION.digitaloceanspaces.com")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.sql.warehouse.dir", WAREHOUSE_LOCATION)
.enableHiveSupport()
.getOrCreate()
)
def test_spaces():
"""Creates the NHL roster table in the bronze layer"""
spark = get_spark()
# Create a simple DataFrame
data = [("John", 25), ("Alice", 30), ("Bob", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# Show the DataFrame
df.show()
# Write DataFrame to DigitalOcean Spaces
df.write.json(f"s3a://bucket_name/test")
Stacktrace:
py4j.protocol.Py4JJavaError: An error occurred while calling o65.json.
: java.nio.file.AccessDeniedException: s3a://PATH: getFileStatus on s3a://PATH: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 069ZFJ7PEE4SDT1B; S3 Extended Request ID: iExUnbCSQn8Tued6vyOcmZvu7BMm/6NVRWtopsmAHk572kPJxY5lV8C4BSkalexpg/18EgWnpAkpf2bTUElcxQ==; Proxy: null), S3 Extended Request ID: iExUnbCSQn8Tued6vyOcmZvu7BMm/6NVRWtopsmAHk572kPJxY5lV8C4BSkalexpg/18EgWnpAkpf2bTUElcxQ==:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:120)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:774)
It was a mean typo in .config("spark.haddop.fs.s3a.endpoint", "https://REGION.digitaloceanspaces.com")
. After changing hddop
to hadoop
everything worked as expected.