Search code examples
pythonpandasdaskdask-dataframedask-delayed

Dask delayed data mismatch


I wish to combine many dataframes into 1 dataframe with dask. However when I try to read those dataframes with dd.from_delayed(parts, meta=types) I get the error Metadata mismatch found in 'from_delayed'.

The full error:

Metadata mismatch found in `from_delayed`.

Partition type: `pandas.core.frame.DataFrame`
+--------+-------+----------+
| Column | Found | Expected |
+--------+-------+----------+
| 'col3' | -     | object   |
+--------+-------+----------+

I know this is because the dataframes I wish to combine do not have the same columns. Data that not exists in a column should be marked as NA. Setting verify_meta=False will silence these errors, but will lead to issues downstream since some of the partitions don't match the metadata.

The code:

import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask import delayed
import os

def dict_to_dataframe(dict):
    return pd.DataFrame.from_dict(dict)


data_a = {'col1': [[1, 2, 3, 4], [5, 6, 7, 8]], 'col2': [[9, 10, 11, 12], [13, 14, 15, 16]]}        
data_b = {'col1': [[17, 18, 19, 20], [21, 22, 23, 24]], 'col3': [[25, 26, 27, 28], [29, 30, 31, 32]]}

parts = [delayed(dict_to_dataframe)(fn) for fn in [data_a, data_b]]
types = pd.DataFrame(columns=['col1', 'col2', 'col3'], dtype=object)
ddf_result = dd.from_delayed(parts, meta=types)

print()
print('Write to file')
file_path = os.path.join('test.hdf')
with ProgressBar():
    ddf_result.compute().sort_index().to_hdf(file_path, key=type, format='table')

written = dd.read_hdf(file_path, key=type)

Solution

  • The dataframes you want to combine need to have the same columns. A solution can be adding the columns that are missing to each dataframe so that they can be combined.

    import dask.dataframe as dd
    import pandas as pd
    from dask import delayed
    from dask.diagnostics import ProgressBar
    
    def dict_to_dataframe(dict, all_columns):
        df = pd.DataFrame.from_dict(dict)
    
        # Add missing columns and sort columns   
        missing_columns = list(set(all_columns).difference(df.columns)) 
        df = df.reindex(columns=sorted([*df.columns.tolist(), *missing_columns]))
    
        # Set new column type to object
        df[missing_columns] = df[missing_columns].astype(object)
    
        return df
    
    
    data_a = {
        101: [[1, 2, 3, 4], [5, 6, 7, 8]],
        110: [[9, 10, 11, 12], [13, 14, 15, 16]],
    }
    data_b = {
        105: [[17, 18, 19, 20], [21, 22, 23, 24]],
        130: [[25, 26, 27, 28], [29, 30, 31, 32]],
    }
    
    
    all_columns = [101, 105, 110, 130, 140]
    parts = [delayed(dict_to_dataframe)(fn, all_columns) for fn in [data_a, data_b]]
    types = pd.DataFrame(columns=all_columns, dtype=object)
    ddf_result = dd.from_delayed(parts, meta=types)
    
    with ProgressBar():
        ddf_result.compute().sort_index().to_csv(file_path, index=False)