Search code examples
apache-sparkamazon-s3pysparkceph

PySpark write data to Ceph returns 400 Bad Request


I have a problem with pySpark configuration when writing data inside a ceph bucket. With the following Python code snippet I can read data from the Ceph bucket but when I try to write inside the bucket, I get the following error:

22/07/22 10:00:58 DEBUG S3ErrorResponseHandler: Failed in parsing the error response :
org.apache.hadoop.shaded.com.ctc.wstx.exc.WstxEOFException: Unexpected EOF in prolog
 at [row,col {unknown-source}]: [1,0]
    at org.apache.hadoop.shaded.com.ctc.wstx.sr.StreamScanner.throwUnexpectedEOF(StreamScanner.java:701)
    at org.apache.hadoop.shaded.com.ctc.wstx.sr.BasicStreamReader.handleEOF(BasicStreamReader.java:2217)
    at org.apache.hadoop.shaded.com.ctc.wstx.sr.BasicStreamReader.nextFromProlog(BasicStreamReader.java:2123)
    at org.apache.hadoop.shaded.com.ctc.wstx.sr.BasicStreamReader.next(BasicStreamReader.java:1179)
    at com.amazonaws.services.s3.internal.S3ErrorResponseHandler.createException(S3ErrorResponseHandler.java:122)
    at com.amazonaws.services.s3.internal.S3ErrorResponseHandler.handle(S3ErrorResponseHandler.java:71)
    at com.amazonaws.services.s3.internal.S3ErrorResponseHandler.handle(S3ErrorResponseHandler.java:52)
[...]
22/07/22 10:00:58 DEBUG request: Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: null; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null
22/07/22 10:00:58 DEBUG AwsChunkedEncodingInputStream: AwsChunkedEncodingInputStream reset (will reset the wrapped stream because it is mark-supported).

Pyspark code (not working):

from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages com.amazonaws:aws-java-sdk-bundle:1.12.264,org.apache.spark:spark-sql-kafka-0-10_2.13:3.3.0,org.apache.hadoop:hadoop-aws:3.3.3 pyspark-shell"

spark = (
    SparkSession.builder.appName("app") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.connection.timeout", "10000") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://HOST_NAME:88") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.endpoint.region", "default") \
    .getOrCreate()
)
spark.sparkContext.setLogLevel("TRACE")

# This works
spark.read.csv("s3a://test-data/data.csv")

# This throws the provided error
df_to_write = spark.createDataFrame([{"a": "x", "b": "y", "c": "3"}])
df_to_write.write.csv("s3a://test-data/with_love.csv")

Also, referring to the same ceph bucket, I am able to read and write data to the bucket via boto3:

import boto3
from botocore.exceptions import ClientError
from botocore.client import Config

config = Config(connect_timeout=20, retries={'max_attempts': 0})

s3_client = boto3.client('s3', config=config,
                        aws_access_key_id=access_key,
                        aws_secret_access_key=secret_key, 
                        region_name="defaut",
                        endpoint_url='http://HOST_NAME:88',
                        verify=False
                        )

response = s3_client.list_buckets()

# Read
print('Existing buckets:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

# Write
dummy_data = b'Dummy string'
s3_client.put_object(Body=dummy_data, Bucket='test-spark', Key='awesome_key')

Also s3cmd with the same configuration is working fine.

I think I'm missing some pyspark (hadoop-aws) configuration, could anyone help me in identifying the configuration problem? Thanks.


Solution

  • After some research on the web, I was able to solve the problem using this hadoop-aws configuration:

    fs.s3a.signing-algorithm: S3SignerType
    

    I configured this property in pySpark with:

    
    spark = (
        SparkSession.builder.appName("app") \
        .config("spark.hadoop.fs.s3a.access.key", access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
        .config("spark.hadoop.fs.s3a.connection.timeout", "10000") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://HOST_NAME:88") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.endpoint.region", "default") \
        .config("spark.hadoop.fs.s3a.signing-algorithm", "S3SignerType") \
        .getOrCreate()
    )
    

    From what I understand, the version of ceph I am using (16.2.3), does not support the default signing algorithm used in Spark version v3.3.0 on hadoop version 3.3.2.

    For further details see this documentation.