I'm reading a larger number (100s to 1000s) of parquet files into a single dask dataframe (single machine, all local). I realized that
files = ['file1.parq', 'file2.parq', ...]
ddf = dd.read_parquet(files, engine='fastparquet')
ddf.groupby(['col_A', 'col_B']).value.sum().compute()
is a lot less efficient than
from dask import delayed
from fastparquet import ParquetFile
@delayed
def load_chunk(pth):
return ParquetFile(pth).to_pandas()
ddf = dd.from_delayed([load_chunk(f) for f in files])
ddf.groupby(['col_A', 'col_B']).value.sum().compute()
For my particular application, the second approach (from_delayed
) takes 6 seconds to complete, the first approach takes 39 seconds. In the dd.read_parquet
case there seems to be a lot of overhead before the workers even start to do something, and there are quite a few transfer-...
operations scattered across the task stream plot. I'd like to understand what's going on here. What could be the reason that the read_parquet
approach is so much slower? What does it do differently than just reading the files and putting them in chunks?
You are experiencing the client trying to establish the min/max statistics of the columns of the data, and thereby establish a good index for the dataframe. An index can be very useful in preventing reading from data files which are not needed for your particular job.
In many cases, this is a good idea, where the amount of data in a file is large and the total number of files is small. In other cases, the same information might be contained in a special "_metadata" file, so that there would be no need to read from all the files first.
To prevent the scan of the files' footers, you should call
dd.read_parquet(..,. gather_statistics=False)
This should be the default in the next version of dask.