Search code examples
parquetpyarrow

Pyarrow overwrites dataset when using S3 filesystem


When writing two parquet files locally to a dataset, arrow is able to append to partitions appropriately. For example, if I were to partition two files using arrow by column A, arrow generates a file structure with sub folders corresponding to each unique value in column A when I write the first parquet file with partitioning. And when the second file is written arrow is intelligent enough to write data to the correct partition. So if file one and two shared common values in column A I see two separate files in the subfolder with the common value. Code example :

df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base +  "parquet_dataset_partition_combined"), 
                    partition_cols=['PartitionPoint'])

df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base +  "parquet_dataset_partition_combined"), 
                    partition_cols=['PartitionPoint'])

this results in :

enter image description here

enter image description here

where the two folders are created due to PartitionColumn have a cardinality of two [A and B] and the subfolder PartitionPart=A having two files because both files actual_07 and actual_08 have something to contribute to ParitionPart=A

This however does not happen when I use the exact same code but using S3 as my filesystem. The code for that is as follows :

from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")


df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage", 
                    partition_cols=['PartitionPoint'],
                    filesystem=s3)

df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage", 
                    partition_cols=['PartitionPoint'],
                   filesystem=s3)

Instead I find that the second write statement is overwriting the data in S3. Each PartitionPart=A folder always contains only one file at a time. Is this a caveat of using S3 as my filesystem ?


Solution

  • The change here is that you are implicitly switching from the legacy dataset writer to the new dataset writer. pq.write_to_dataset will use the legacy behavior by default. However, if a filesystem is provided (the legacy behavior does not support this) then it will use the new behavior:

        if use_legacy_dataset is None:
            # if a new filesystem is passed -> default to new implementation
            if isinstance(filesystem, FileSystem):
                use_legacy_dataset = False
            # otherwise the default is still True
            else:
                use_legacy_dataset = True
    

    The legacy writer's default behavior was to name files using a GUID and so if you did two writes (each write containing data for each folder) you would get two files in each folder. The new writer's default beahvior names files using a counter (e.g. part-{i}.extension). This means that multiple writes will potentially overwrite existing files (since the counter resets with each call to write_to_dataset)

    To get this behavior with the newer dataset writer via pyarrow.dataset.write_dataset. You will need to use the basename_template argument and generate a new basename_template for each write (an easy way to do this is by appending a uuid to the template). For example:

    ds.write_dataset(table, '/tmp/mydataset', filesystem=s3,
      partitioning=partitioning, basename_template=str(uuid.uuid4()) + '-{i}',
      format='parquet')
    

    A few things to point out with migrating to the new format:

    • format='parquet' - The new writer supports writing multiple file formats so you need to specify parquet.
    • partitioning=partitioning - The new writer has a more flexible format for specifying the partitioning schema. To get the old behavior you will want the hive flavor of partitioning:
    import pyarrow.dataset as ds
    # Note, you have to supply a schema here and not just a list of columns.
    # However, this is hopefully changing in part of 6.0 so you can take
    # an approach similar to the old style of just specifying column
    # names (ARROW-13755).
    partitioning = ds.partitioning(schema=pa.schema([pa.field('PartitioningPoint', type=pa.string())]))