Basically this is answered for pandas in python pandas: Remove duplicates by columns A, keeping the row with the highest value in column B. In pandas I adopted the solution
df.sort_values('B', ascending=False).drop_duplicates('A').sort_index()
but I cannot apply the same solutions efficiently to dask, since dask doesn't like sort_values
. I can get the max indices via
max_idx = df.groupby("A")["B"].idxmax().values
But I have to compute the max indeces before I can use them as an argument to df.loc
, i.e.
df.loc[max_idx.compute()]
On an entire dask frame, the method df.nlargest(1, "B")
does what I need, but I haven't figured out how to use with groupby for my needs.
In my dask-frame based analysis my workflow is currently to use dask for out-of-memory operations to do different operations an selections on a dataset until it gets to a managable size and then continue with pandas, so my temporary solution is to move the duplicate removal to the pandas part of my analysis, but I'm curious whether there is an efficient an elegant way to do it in dask.
Remove duplicates by columns A, keeping the row with the highest value in column B
In this case, your pandas solution of df.sort_values('B', ascending=False).drop_duplicates('A').sort_index()
requires a global sort, which we don't have in Dask on CPUs outside of set_index
(though we do on GPUs).
In general, an effective approach to this kind of problem is to attempt to minimize the need for global information.
In this case, you can reframe your algorithm in terms of a hash bashed shuffle + within-partition map/reduce, since a given row only needs to know about the other rows associated with the same key.
import pandas as pd
import dask.dataframe as dd
import numpy as np
np.random.seed(12)
df = pd.DataFrame({
"a": [0,1,2,3,4]*20,
"b": np.random.normal(10, 5, 100)
})
ddf = dd.from_pandas(df, npartitions=10)
print(df.sort_values('b', ascending=False).drop_duplicates('a').sort_index())
a b
9 4 24.359097
16 1 15.062577
47 2 21.209089
53 3 20.571721
75 0 18.182315
With Dask, we can do a hash based shuffle which will guarantee that all rows of a given key are in the same partition. Then, we can run our pandas reduction independently on each partition.
print(ddf.shuffle(on="a").map_partitions(
lambda x: x.sort_values("b", ascending=False).drop_duplicates('a')
).compute())
a b
16 1 15.062577
47 2 21.209089
9 4 24.359097
75 0 18.182315
53 3 20.571721
If you need your final output to be globally sorted, then things get complicated. Often, this isn't necessary.