Search code examples
pythonpandasdaskdask-dataframe

Best way to perform arbitrary operations on groups with Dask DataFrames


I want to use Dask for operations of the form

df.groupby(some_columns).apply(some_function)

where some_function() may compute some summary statistics, perform timeseries forecasting, or even just save the group to a single file in AWS S3.

Dask documentation states (and several other StackOverflow answers cite) that groupby-apply is not appropriate for aggregations:

Pandas’ groupby-apply can be used to to apply arbitrary functions, including aggregations that result in one row per group. Dask’s groupby-apply will apply func once to each partition-group pair, so when func is a reduction you’ll end up with one row per partition-group pair. To apply a custom aggregation with Dask, use dask.dataframe.groupby.Aggregation.

It is not clear whether Aggregation supports operations on multiple columns. However, this DataFrames tutorial seems to do exactly what I'm suggesting, with roughly some_function = lambda x: LinearRegression().fit(...). The example seems to work as intended, and I've similarly had no problems so far with e.g. some_function = lambda x: x.to_csv(...).

Under what conditions can I expect that some_function will be passed all rows of the group? If this is never guaranteed, is there a way to break the LinearRegression example? And most importantly, what is the best way to handle these use cases?


Solution

  • It appears that the current version of documentation and the source code are not in sync. Specifically, in the source code for dask.groupby, there is this message:

    Dask groupby supports reductions, i.e., mean, sum and alike, and apply. The former do not shuffle the data and are efficiently implemented as tree reductions. The latter is implemented by shuffling the underlying partiitons such that all items of a group can be found in the same parititon.

    This is not consistent with the warning in the docs about partition-group. The snippet below and task graph visualization also show that there is shuffling of data to ensure that partitions contain all members of the same group:

    import dask.dataframe as dd
    import pandas as pd
    
    df = pd.DataFrame({'group': [0,0,1,1,2,0,1,2], 'npeople': [1,2,3,4,5,6,7,8]})
    ddf = dd.from_pandas(df, npartitions=2)
    
    def myfunc(df):
        return df['npeople'].sum()
    
    results_pandas = df.groupby('group').apply(myfunc).sort_index()
    results_dask = ddf.groupby('group').apply(myfunc).compute().sort_index()
    
    print(sum(results_dask != results_pandas))
    # will print 0, so there are no differences
    # between dask and pandas groupby