Search code examples
apache-sparkspark-streamingspark-structured-streamingapache-spark-datasetspark-checkpoint

How to store Spark Streaming Checkpoint Location into S3?


I am interested in a Spark Streaming app (Spark v2.3.2) that sources S3 parquet data and writes parquet data to S3. The app's data frame stream makes use of groupByKey() and flatMapGroupsWithState() to make use of the GroupState.

Is it possible to configure this to use an s3 checkpoint location? For example:

val stream = myDataset.writeStream
    .format("parquet")
    .option("path", s3DataDestination)
    .option("checkpointLocation", s3CheckpointPath)
    .option("truncate", false)
    .option(Trigger.Once)
    .outputMode(OutputMode.Append)
stream.start().awaitTermination()

I confirmed the above is able to successfully write data to the s3DataDestination.

However, an exception is thrown when writing to the s3 checkpoint location:

java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta

Would this require a custom implementation of a S3 StateStoreProvider? Or, does the checkpoint location need to be written into HDFS?


Solution

  • The problem is the frequency of concurrency of writes and reads is too high. AWS S3 does not provide such kind of feature.

    Solution :

    • We had to switch to local mounted persistent disk for the Spark check pointing
    • S3Guard : This will make the S3 reads and writes more consistent (Note : It is experimental and I personally have never seen it in action)
    • Use HDFS