Search code examples
pandasdaskparquetdask-distributed

dask loading multiple parquet files with different column selections


I want to use Dask to load specific columns from many parquet files that are stored in different directories and each parquet needs to load different columns. I want to use Dask so that I can use multiple cores on a single machine. I see how you can pass a list of files or wildcards to dd.read_parquet to indicate multiple files (e.g. *.parquet), but I do not see a way of passing different sets of columns to be read for each file. I'm wondering if this can be done using dask.delayed.

My specific situation is:

I'm storing large single cell gene expression datasets (~30,000 rows/genes by ~10,000 columns/cells) as parquet files in different directories. Each directory has two parquet files 1) the large gene expression data (cells as columns) and 2) cell metadata (cells as rows and cell metadata as columns). I'm using the smaller metadata parquet files to look up the columns/cells that I need in the larger file. For instance, I'll find all the cells that are of a specific cell type using the metadata parquet file, then load only those cells from the larger file. I'm able to do this using Pandas, but I would like to use Dask for parallelism.


Solution

  • If you can do this using Pandas .read_parquet, while specifying columns (see the very last code example), then one possible approach is to delay your existing Pandas-specific approach by replacing

    pd.read_parquet(..., columns=[list_of_cols])
    

    by

    dask.delayed(pd.read_parquet)(..., columns=[list_of_cols])
    

    as you had suggested.

    EDIT

    I had to do something similar for a single directory of paired .csv files - metadata and corresponding spectra. My filtering logic was minimal so I created a Python dict whose keys were the metadata logic (that produced a file name) and the value was the list of columns. I looped over the dictionary key-value paris and

    • read in the corresponding list of columns from the associated spectrum file using dd.read_csv(..., columns=[list_of_cols])
    • appended the ddf to a blank list (obviously followed by dd.concat() to vertically concatenate them all together after the loop)

    In my case, though, the metadata contents changed in a predictable manner which is why I could programmatically assemble the dict using dictionary comprehension.