Search code examples
pythonpandasdaskfastparquet

Reading large number of parquet files: read_parquet vs from_delayed


I'm reading a larger number (100s to 1000s) of parquet files into a single dask dataframe (single machine, all local). I realized that

files = ['file1.parq', 'file2.parq', ...]
ddf = dd.read_parquet(files, engine='fastparquet')
ddf.groupby(['col_A', 'col_B']).value.sum().compute()

is a lot less efficient than

from dask import delayed
from fastparquet import ParquetFile

@delayed
def load_chunk(pth):
    return ParquetFile(pth).to_pandas()

ddf = dd.from_delayed([load_chunk(f) for f in files])
ddf.groupby(['col_A', 'col_B']).value.sum().compute()

For my particular application, the second approach (from_delayed) takes 6 seconds to complete, the first approach takes 39 seconds. In the dd.read_parquet case there seems to be a lot of overhead before the workers even start to do something, and there are quite a few transfer-... operations scattered across the task stream plot. I'd like to understand what's going on here. What could be the reason that the read_parquet approach is so much slower? What does it do differently than just reading the files and putting them in chunks?


Solution

  • You are experiencing the client trying to establish the min/max statistics of the columns of the data, and thereby establish a good index for the dataframe. An index can be very useful in preventing reading from data files which are not needed for your particular job.

    In many cases, this is a good idea, where the amount of data in a file is large and the total number of files is small. In other cases, the same information might be contained in a special "_metadata" file, so that there would be no need to read from all the files first.

    To prevent the scan of the files' footers, you should call

    dd.read_parquet(..,. gather_statistics=False)
    

    This should be the default in the next version of dask.