Search code examples
daskparquet

Dask.dataframe.to_parquet making extremely large file


I am converting 10 large fixed width files(average 19GB) into a parquet. I am doing this by stacking the fixed width files

file_list = [files]

stacked_files = open(stacked.txt,'a')
for i in file_list:
    f = open(i)
    for line in f:
        stacked_files.write(line)
    f.close()
    print(i,(time.time() - file_start)//60)
stacked_files.close()

This process took 3 hours to complete. I then use dask to read the file in, and convert it to a parquet. I have fastparquet installed

df = dd.read_fwf(stacked.txt, colspecs = colspecs, names = names)
df.to_parquet('parquet.parquet')

I plan to add some processing to this, like sorting it by resetting the index and doing calculations on the columns, but for now as I learn dask, I want to see how changing it to a parquet works. This has been running for 2 days now, and has made over 2200 151 MB files, totaling 340gb, and it is still growing. Is there a way that I can read in the files to a dask dataframe without stacking them, and will that be faster? And is there anything I can change to make the output file smaller? My understanding was that parquets are compressed, and should be smaller than a .txt file.

edit Added code to reproduce the problem: This code took 4 minutes to run on my machine. It created a file 'test.csv' that was 96 MB and created a file 'test.parquet' that was 239 MB. I am using a Fixed width file for the code that I am currently havign issues with, but csv appears to reproduce the effect of tripling the size of the file.

import dask.dataframe as dd
import pandas as pd
import random
import os
test_file_folder = 'folder'

#create 500 columns
colnames = []
letters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
first_letter = 0
for i in range(500):
    second_letter = i%26
    colnames.append(letters[first_letter]+letters[second_letter])
    if i%26 == 0 and i !=0:
        first_letter +=1

#create a dictionary with 100,000 data points in each key with column names as keys
df = {}
for i in colnames:
    temp = []
    for x in range(100000):
        temp.append(random.choice(letters))
    df[i] = temp

#create the df and send it to csv
df = pd.DataFrame.from_dict(df)

df.to_csv(os.path.join(test_file_folder,'test.csv'))

ddf = dd.read_csv(os.path.join(test_file_folder,'test.csv'))
ddf.to_parquet(os.path.join(test_file_folder,'test.parquet'))

Solution

  • The code you provide produces a CSV of 100MB and a parquet dataset of 93MB. The difference is that probably you are lacking snappy, the compression library.

    This is not untypical for random text data, which usually doesn't compress well. There are some tricks that you can play with fixed width column (fastparquet allows this, but it is rarely used) and categorical/dictionary encoding (which will depend on the cardinality of the data).

    Some notes

    • 500 columns is high, it means that you don't really have "tabular" data in the sense that parquet was made for; the schema and details blocks for all these columns take up space and are duplicated between files
    • because of the high number of columns, the number of rows per partition is much smaller than would be typical, so the space overhead of the meatadata is proportionately higher
    • it is possible to forgo producing min/max stats per column chunk and to not make the metadata file, but instead rely on the schemas being the same in each file; but this is not something readily exposed to the user. (the former only exists in a PR)
    • text is stored by (length)(data) blocks for each string, where the length is 4 bytes; so if the text strings are 2-bytes each, they will be stored as 6 bytes in the parquet data and 3 bytes in CSV (because of the comma). A variant encoding separates out the lengths so that they can stored efficiently as integers (would work extremely well since they are all the same value) but none of the parquet frameworks actually implements this.