Search code examples
pythondaskdask-dataframe

Convert column of categoricals to additional columns


I have a large dataset in the form of the following dataframe that I previously loaded from avro files

timestamp id category value
2021-01-01 00:00:00+00:00 a d g
2021-01-01 00:10:00+00:00 a d h
2021-01-01 00:10:00+00:00 a e h
2021-01-01 00:00:00+00:00 b e h

I would like to pivot the category column (which contains on the order of 50 different categories) and kind of deduplicate along the timestamp and id columns so the result looks like this

id timestamp d e
a 2021-01-01 00:00:00+00:00 g nan
a 2021-01-01 00:10:00+00:00 h h
b 2021-01-01 00:00:00+00:00 nan h

I know how I would achieve this in pandas using multiindices together with the stack/unstack operations, however my dataset is way too large to use pandas without manual batch processing and dask does not support multiindices. Is there some way this can be efficiently done with dask?

Edit:

As noted by @Dahn, I've created a minimal synthetic example with pandas:


import pandas as pd

records = [
    {'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
    {'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
    {'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
    {'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
    {'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
    {'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]

frame = pd.DataFrame(records)
   idx id category  value
0    0  a        d      1
1    1  a        e      2
2    2  a        f      3
3    0  b        d      4
4    1  c        e      5
5    2  c        f      6
frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
  id  idx    d    e    f
0  a    0  1.0  NaN  NaN
1  a    1  NaN  2.0  NaN
2  a    2  NaN  NaN  3.0
3  b    0  4.0  NaN  NaN
4  c    1  NaN  5.0  NaN
5  c    2  NaN  NaN  6.0



Solution

  • I don't believe Dask implements this as of October 2021. This is likely because there's no support for multi-index, which unstack requires. There has been some work on this recently though.

    However, I think this still should be possible using the apply-concat-apply paradigm (and apply_concat_apply function).

    The solution below works for the example you've provided and in principle, I think, it should work generally, but I am not sure. Please proceed with caution and, if possible, check that results agree with what Pandas gives you. I have also posted this as a feature request on Dask's github itself.

    import dask.dataframe as dd
    
    # Create Dask DataFrame out of your `frame`
    # npartitions is more than 1 to demonstrate this works on a partitioned datataset
    df = dd.from_pandas(frame, npartitions=3)
    
    # Dask needs to know work out what the categories are
    # Alternatively you can use df.categorize
    # See https://docs.dask.org/en/latest/dataframe-categoricals.html
    category = 'category'
    df[category] = df[category].astype(category).cat.as_known()
    
    # Dask needs to know what the resulting DataFrame looks like
    new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
    meta = pd.DataFrame(columns=new_columns, 
                        index=df._meta.set_index(['idx', 'id']).index)
    
    # Implement using apply_concat_apply ("aca")
    # More details: https://blog.dask.org/2019/10/08/df-groupby
    def identity(x): return x
    
    def my_unstack(x):
        return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
        
    def combine(x):
        return x.groupby(level=[0, 1]).sum()
    
    result = dd.core.apply_concat_apply([df], 
                       chunk=identity, 
                       aggregate=my_unstack, 
                       combine=combine,
                       meta=meta)
    
    result.compute()
    

    Option B: map_partitions

    If you are already able to sort the data according to at least one of idx or id, then you could also simply use map_partitions and treat each partition as a Pandas dataframe.

    This should lead to significant improvement in memory usage and performance overall.

    # df has sorted index `idx` in this scenario
    
    category = 'category'
    existing_categories = df[category].astype(category).cat.as_known().cat.categories
    categories = [('value', cat) for cat in existing_categories]
    
    new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))
    
    meta = pd.DataFrame(columns=new_columns, 
                        index=df._meta.set_index(['idx', 'id']).index)
    
    def unstack_add_columns(x):
        x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
        # make sure that result contains all necessary columns
        return x.reindex(columns=new_columns) 
    
    df.map_partitions(unstack_add_columns, meta=meta)
    

    If you cannot guarantee idx will be sorted, you could possibly try something like

    df_sorted = df.set_index('idx')
    # I recommend saving to disk in between set_index and the rest
    df_sorted.to_parquet('data-sorted.parq')
    

    but that might itself bring problems with memory.