Search code examples
daskdask-dataframedrop-duplicates

Dask Dataframe: Remove duplicates by columns A, keeping the row with the highest value in column B


Basically this is answered for pandas in python pandas: Remove duplicates by columns A, keeping the row with the highest value in column B. In pandas I adopted the solution

df.sort_values('B', ascending=False).drop_duplicates('A').sort_index()

but I cannot apply the same solutions efficiently to dask, since dask doesn't like sort_values. I can get the max indices via

max_idx = df.groupby("A")["B"].idxmax().values

But I have to compute the max indeces before I can use them as an argument to df.loc, i.e.

df.loc[max_idx.compute()]

On an entire dask frame, the method df.nlargest(1, "B") does what I need, but I haven't figured out how to use with groupby for my needs.

In my dask-frame based analysis my workflow is currently to use dask for out-of-memory operations to do different operations an selections on a dataset until it gets to a managable size and then continue with pandas, so my temporary solution is to move the duplicate removal to the pandas part of my analysis, but I'm curious whether there is an efficient an elegant way to do it in dask.


Solution

  • Remove duplicates by columns A, keeping the row with the highest value in column B

    In this case, your pandas solution of df.sort_values('B', ascending=False).drop_duplicates('A').sort_index() requires a global sort, which we don't have in Dask on CPUs outside of set_index (though we do on GPUs).

    In general, an effective approach to this kind of problem is to attempt to minimize the need for global information.

    In this case, you can reframe your algorithm in terms of a hash bashed shuffle + within-partition map/reduce, since a given row only needs to know about the other rows associated with the same key.

    import pandas as pd
    import dask.dataframe as dd
    import numpy as np
    ​
    np.random.seed(12)
    ​
    df = pd.DataFrame({
        "a": [0,1,2,3,4]*20,
        "b": np.random.normal(10, 5, 100)
    })
    ddf = dd.from_pandas(df, npartitions=10)
    ​
    print(df.sort_values('b', ascending=False).drop_duplicates('a').sort_index())
        a          b
    9   4  24.359097
    16  1  15.062577
    47  2  21.209089
    53  3  20.571721
    75  0  18.182315
    

    With Dask, we can do a hash based shuffle which will guarantee that all rows of a given key are in the same partition. Then, we can run our pandas reduction independently on each partition.

    print(ddf.shuffle(on="a").map_partitions(
            lambda x: x.sort_values("b", ascending=False).drop_duplicates('a')
        ).compute())
        a          b
    16  1  15.062577
    47  2  21.209089
    9   4  24.359097
    75  0  18.182315
    53  3  20.571721
    

    If you need your final output to be globally sorted, then things get complicated. Often, this isn't necessary.