Search code examples
hadoopapache-sparkgoogle-cloud-platformgoogle-cloud-storagegoogle-cloud-dataproc

S3Guard or s3committer for Google Cloud Storage


I am working with Dataproc and Parquet on Google Cloud Platform, with data on GCS, and writing lots of small to moderately sized files is a major hassle, being a couple times slower than what I would get with less bigger files or HDFS.

The Hadoop community has been working on S3Guard, which uses DynamoDB for S3A. Similarly, s3committer uses S3's multi-part API to provide a simple alternative committer that is much more efficient.

I am looking for similar solutions on GCS. The multi-part API from S3 is one of the few things not offered by GCS's XML API and thus cannot be used as is. Instead, GCS has a "combine" API where you upload files separately and then issue a combine query. This seems like it could be used to adapt the multi-part upload from s3committer but I am not quite sure.

I could not find any information about using S3Guard on GCS with an alternate key value store (and the S3A connector -- not even sure it can be used with the GCS XML API).

0-rename commits seem to be a common issue with Hadoop and Apache Spark. What are usual solutions to that on GCS, besides "writing less, bigger files"?


Solution

  • There are a few different things in play here. For the problem of enforcing list consistency, Dataproc traditionally relied on a per-cluster NFS mount to apply client-enforced list-after-write consistency; more recently, Google Cloud Storage has managed to improve its list-after-write consistency semantics and now list operations are strongly consistency immediately after all writes. Dataproc is phasing out client-enforced consistency, and something like S3Guard on DynamoDB is no longer needed for GCS.

    As for multipart upload, in theory it could be possible to use GCS Compose as you mention, but in most cases the parallel multipart uploads for single large files is mostly helpful in a single-stream situation, whereas most Hadoop/Spark workloads will already be parallelizing different tasks per machine such that it's not beneficial to multithread each individual upload stream; aggregate throughput will be about the same with or without parallel multipart uploads.

    So that leaves the question of using the multi-part API to perform conditional/atomic commits. The GCS connector for Hadoop does currently use something called "resumable uploads" where it's theoretically possible for a node to be responsible for "committing" an object that has been uploaded by a completely different node; the client libraries just aren't currently structured to make this very straightforward. However, at the same time, the "copy-and-delete" phase of a GCS "rename" is also different from S3 in that it is done as metadata operations instead of a true data "copy". This makes GCS amenable to using vanilla Hadoop FileCommitters instead of needing to commit "directly" into the final location and skipping the "_temporary" machinery. It may not be ideal to have to "copy/delete" metadata of each file instead of a true directory rename, but it also isn't proportional to the underlying data size, only proportional to the number of files.

    Of course, all this still doesn't solve the fact that committing lots of small files is inefficient. It does, however, make it likely that the "direct commit" aspect isn't as much of a factor as you might think; more often the bigger issue is something like Hive not parallelizing file commits at completion time, especially when committing to lots of partition directories. Spark is much better at this, and Hive should be improving over time.

    There is a recent performance improvement using a native SSL library in Dataproc 1.2 which you can try without having to "write less, bigger files", just by using Dataproc 1.2 out of the box.

    Otherwise, real solutions really do involve writing fewer, bigger files, since even if you fix the write side, you'll suffer on the read side if you have too many small files. GCS is heavily optimized for throughput, so anything less than around 64MB or 128MB may be spending more time just on overhead of spinning up a task and opening the stream vs actual computation (should be able to read that much data in maybe 200ms-500ms or so).

    In that vein, you'd want to make sure you set things like hive.merge.mapfiles, hive.merge.mapredfiles, or hive.merge.tezfiles if you're using those, or repartition your Spark dataframes before saving to GCS; merging into larger partitions is usually well worth it for keeping your files manageable and profiting from ongoing faster reads.

    Edit: One thing I forgot to mention is that I've been loosely using the term repartition, but in this case since we're strictly trying to bunch up the files into larger files, you may do better with coalesce instead; there's more discussion in another StackOverflow question about repartition vs coalese.