Search code examples
pythonpandasdaskdask-distributeddask-delayed

Dask @delayed converts dataframes to pandas


I have this code that calls a dask @delayed function that takes N dask dataframes as input and returns a dask dataframe as output.

There are two problems (1) inside the function the type of the dataframe is pandas instead of dask, and (2) when I get the result of the function, it's also pandas instead of dask.

What is the logic behind @delayed to get as input pandas dataframes instead of dask? I need to work only with dask dataframes.

This is the code:

df = pd.DataFrame({
    'height':  [6.21, 5.12, 5.85, 5.78, 5.98],
    'weight': [150, 126, 133, 164, 203]
})

df_dask = dd.from_pandas(df, npartitions=2)


@delayed
def some_function(*b):
    print('type b[0]: ' + str(type(b[0])) )
    ddf = b[0]
    return ddf

ddfout = some_function(df_dask, df_dask, df_dask)

computed = ddfout.compute()
>>> type b[0]: <class 'pandas.core.frame.DataFrame'> # this should be dask dataframe

type(computed)
>>> pandas.core.frame.DataFrame

Solution

  • The way to get column 0 of dask dataframe ddf is:

    ddf[0]
    

    There is no need for delayed here, the API already provides lazy operations over all the pandas methods that are supported (most of them).

    Delayed is for arbitrary operations on constants and other delayed values, not dask collections like the dataframe.

    See documentation: https://docs.dask.org/en/stable/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections ; you probably wanted ddf.map_partitions.