Search code examples
apache-sparkamazon-s3parquet

Writing many files to parquet from Spark - Missing some parquet files


We developed a job that process and writes a huge amount of files in parquet in Amazon S3 (s3a) using Spark 2.3. Every source file should create a different partition in S3. The code was tested (with less files) and working as expected.

However after the execution using the real data we noticed that some files (a small amount of the total) were not written to parquet. No error or anything weird in the logs. We tested again the code for the files that were missing and it worked ¿?. We want to use the code in a production enviroment but we need to detect what's the problem here. We are writing to parquet like this:

dataframe_with_data_to_write.repartition($"field1", $"field2").write.option("compression", "snappy").option("basePath", path_out).partitionBy("field1", "field2", "year", "month", "day").mode(SaveMode.Append).parquet(path_out)

We used the recommended parameters:

spark.sparkContext.hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")  
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Is there any known issue of bug using this parameters? Maybe something with S3 eventual consistency? Any suggestions?

Any help will be appreciated.


Solution

  • yes, it is a known issue. Work is committed by listing the output in the attempt working directory and renaming into the destination directory. If that listing underreports files: output is missing. If that listing lists files which aren't there, the commit fails.

    Fixes on the ASF Hadoop releases.

    1. hadoop-2.7-2.8 connectors. Write to HDFS, copy files
    2. Hadoop 2.9-3.0 turn on S3Guard for a consistent S3 listing (uses DynamoDB for this)
    3. Hadoop 3.1, switch to the S3A committers which are designed with the consistency and performance issues in mind. The "staging" one from netflix is the simplest to use here.

    Further reading: A zero-rename committer.

    Update 11-01-2019, Amazon has its own closed source implementation of the ASF zero rename committer. Ask the EMR team for their own proofs of correctness, as the rest of us cannot verify this.

    Update 11-dec-2020: Amazon S3 is now fully consistent, so listing will be up to date and correct; update inconsistency and 404 caching no more.

    • The v1 commit algorithm is still unsafe as directory rename is non-atomic
    • The v2 commit algorithm is always broken as it renames files one-by-one
    • Renames are slow O(data) copy operations on S3, so the window of failure during task commit is bigger.

    You aren't at risk of data loss any more, but as well as performance being awful, failure during task commits aren't handled properly