Search code examples
pythonpandashdf5pytablesdask

Using dask to import many MAT files into one DataFrame


I have many mat files of the same format and I wish to join these mat files into one DataFrame with a DatetimeIndex. Currently, a for loop reads in these mat files and load the contents of each into a pandas DataFrames using scipy.io.loadmat and then each DataFrame is appended to an hdf5 table.

Each mat file contains a 4096x1024 single precision matrix and initally each iteration of the loop takes approximately 1.5 seconds. I have tested this with 806 mat files ( 12.5GB taking ~25 minutes ) but I would like to apply this to potentially millions of these files and I am interested in finding a workflow and data container that would allow me to import new data and query subsets of the time series quickly.

Would it be possible using dask or another tool to speed up this import process and create a single queryable time series?

for rot_file in rotation_files:
    print(rot_file)
    time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
    polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
    polar_image = polar_image.transpose()
    polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
    rot_id = time_stamps[0]
    rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
    rot_id_df.index = polar_image.index
    polar_image.join(rot_id_df)
    polar_image.columns = [str(col_name) for col_name in polar_image.columns]
    polar_image.to_hdf('rot_data.h5', 'polar_image', format='table', append=True, complib='blosc', complevel=9)

It seems like the import should be possible using dask.delayed but I am not sure how this can be written to a single hdf file.


Solution

  • In order to query the data, you don't need to write to a data format explicitly supported by dask. You could define your dataframe as follows:

    def mat_to_dataframe(rot_file):
        time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
        polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
        polar_image = polar_image.transpose()
        polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
        rot_id = time_stamps[0]
        rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
        rot_id_df.index = polar_image.index
        polar_image.join(rot_id_df)
        polar_image.columns = [str(col_name) for col_name in polar_image.columns]
        return polar_image
    
    from dask import delayed
    import dask.dataframe as dd
    
    parts = [delayed(mat_to_dataframe)(fn) for fn in matfiles_list]
    df = dd.from_delayed(parts)
    

    This is a "lazy" data-frame: you can apply pandas-like computations to it, but these only execute when you call .compute(). If the matload process holds the python GIL, then I would recommend using the distributed scheduler (even on a single machine) client = dask.distributed.Client().

    If you can know the timestamps of each section a priori, then you can also provide divisions= to from_delayed, which means that if your queries have filters on the index, then dask will know which files don't need to be loaded.

    If the process to load is slow, and you want a faster format to query from, try df.to_hdf or df.to_parquet. Each have several options that will affect your performance.

    Note that time_stamps[0].apply(convert_to_python_datetime).values can probably be achieved faster using pd.to_datetime(time_stamps[0]).