Search code examples
pythonpandasparallel-processingdistributed-computingdask

Dask map_partitions results in duplicates when reducing and gives wrong results compared to pure pandas


When I use dask to groupby using map_partitions, I obtain duplicated data and wrong results compared to simple pandas groupby. But when I use n_partitons=1, I get the correct results. Why does this happen? and how can I use multiple partitions and still get the correct results?

my code is

measurements = measurements.repartition(n_partitions=38)
measurements.map_partitions(lambda df : df.groupby(["id",df.time.dt.to_period("M"), 
"country","job"]).source.nunique()).compute().reset_index()

In pandas, I do

measurements.groupby(["id",measurements.time.dt.to_period("M"), 
    "country","job"]).source.nunique().reset_index()

PS: I'm using a local cluster on a single machine.


Solution

  • When you call map_partitions, you say you want to perform that action on each partition. Given that each unique grouping value can occur in multiple partitions, you will get an entry for each group, for each partition in which it is found.

    What if there were a way to do groupby across partitions and have the results smartly merged for you automatically? Fortunately, this is exactly what dask does, and you did not need to use map_partitions at all.

    measurements.groupby(...).field.nunique().compute()