Search code examples
pythondaskdask-delayed

Python and Dask - reading and concatenating multiple files


I have some parquet files, all coming from the same domain but with some differences in structure. I need to concatenate all of them. Below some example of these files:

file 1:
A,B
True,False
False,False

file 2:
A,C
True,False
False,True
True,True

What I am looking to do is to read and concatenate these files in the fastest way possible obtaining the following result:

A,B,C
True,False,NaN
False,False,NaN
True,NaN,False
False,NaN,True
True,NaN,True

To do that I am using the following code, extracted using (Reading multiple files with Dask, Dask dataframes: reading multiple files & storing filename in column):

import glob

import dask.dataframe as dd
from dask.distributed import Client
import dask

def read_parquet(path):
    return pd.read_parquet(path)

if __name__=='__main__':

    files = glob.glob('test/*/file.parquet')

    print('Start dask client...')
    client = Client()

    results = [dd.from_delayed(dask.delayed(read_parquet)(diag)) for diag in diag_files]

    results = dd.concat(results).compute()

    client.close()

This code works, and it is already the fastest version I could come up with (I tried sequential pandas and multiprocessing.Pool). My idea was that Dask could ideally start part of the concatenation while still reading some of the files, however, from the task graph I see some sequential reading of the metadata of each parquet file, see the screenshot below: task graph dask

The first part of the task graph is a mixture of read_parquet followed by read_metadata. The first part always shows only 1 task executed (in the task processing tab). The second part is a combination of from_delayed and concat and it is using all of my workers.

Any suggestion on how to speed up the file reading and reduce the execution time of the first part of the graph?


Solution

  • The problem with your code is that you use Pandas version of read_parquet.

    Instead use:

    • dask version of read_parquet,
    • map and gather methods offered by Client,
    • dask version of concat,

    Something like:

    def read_parquet(path):
        return dd.read_parquet(path)
    
    def myRead():
        L = client.map(read_parquet, glob.glob('file_*.parquet'))
        lst = client.gather(L)
        return dd.concat(lst)
    
    result = myRead().compute()
    

    Before that I created a client, once only. The reason was that during my earlier experiments I got an error message when I attempted to create it again (in a function), even though the first instance has been closed before.