Search code examples
pandasparquetamazon-kinesis-firehosepyarrow

Repartitioning parquet-mr generated parquets with pyarrow/parquet-cpp increases file size by x30?


Using AWS Firehose I am converting incoming records to parquet. In one example, I have 150k identical records enter firehose, and a single 30kb parquet gets written to s3. Because of how firehose partitions data, we have a secondary process (lambda triggered by s3 put event) read in the parquet and repartitions it based on the date within the event itself. After this repartitioning process, the 30kb file size jumps to 900kb.

Inspecting both parquet files-

  • The meta doesn't change
  • The data doesn't change
  • They both use SNAPPY compression
  • The firehose parquet is created by parquet-mr, the pyarrow generated parquet is created by parquet-cpp
  • The pyarrow generated parquet has additional pandas headers

The full repartitioning process-

import pyarrow.parquet as pq

tmp_file = f'{TMP_DIR}/{rand_string()}'
s3_client.download_file(firehose_bucket, key, tmp_file)

pq_table = pq.read_table(tmp_file)

pq.write_to_dataset(
    pq_table,
    local_partitioned_dir,
    partition_cols=['year', 'month', 'day', 'hour'],
    use_deprecated_int96_timestamps=True
)

I imagine there would be some size change, but I was surprised to find such a big difference. Given the process i've described, what would cause the source parquet to go from 30kb to 900kb?


Solution

  • Parquet uses different column encodings to store low entropy data very efficiently. For example:

    • It can use delta encoding to only store differences between values. For example 9192631770, 9192631773, 9192631795, 9192631797 would be stored effectively as 9192631770, +3, +12, +2.
    • It can use dictionary encoding to shortly refer to common values. For example, Los Angeles, Los Angeles, Los Angeles, San Francisco, San Francisco would be stored as a dictionary of 0 = Los Angeles, 1 = San Francisco and the references 0, 0, 0, 1, 1
    • It can use run-length encoding to only store the number of repeating values. For example, Los Angeles, Los Angeles, Los Angeles would be effectively stored as Los Angeles×3. (Actually as far as I know pure RLE is only used for boolean types at this moment, but the idea is the same.)
    • A combination of the above, specifically RLE and dictionary encoding. For example, Los Angeles, Los Angeles, Los Angeles, San Francisco, San Francisco would be stored as a dictionary of 0 = Los Angeles, 1 = San Francisco and the references 0×3, 1×2

    With the 3 to 5 values of the examples above, the savings are not that significant, but the more values you have the bigger the gain. Since you have 150k identical records, the gains will be huge, since with RLE dictionary encoding, each column value will only have to be stored once, and then marked as repeating 150k times.

    However, it seems that pyarrow does not use these space-saving encodings. You can confirm this by taking a look at the metadata of the two files using parquet-tools meta. Here is a sample output:

    file schema: hive_schema 
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id:          OPTIONAL INT32 R:0 D:1
    name:        OPTIONAL BINARY O:UTF8 R:0 D:1
    
    row group 1: RC:61 TS:214 OFFSET:4 
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id:           INT32 UNCOMPRESSED DO:0 FPO:4 SZ:107/107/1.00 VC:61 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[min: 1, max: 5, num_nulls: 0]
    name:         BINARY UNCOMPRESSED DO:0 FPO:111 SZ:107/107/1.00 VC:61 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[min: Los Angeles, max: San Francisco, num_nulls: 0]
    

    The encoding is shown as ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY.