I have a large dataset in the form of the following dataframe that I previously loaded from avro files
timestamp | id | category | value |
---|---|---|---|
2021-01-01 00:00:00+00:00 | a | d | g |
2021-01-01 00:10:00+00:00 | a | d | h |
2021-01-01 00:10:00+00:00 | a | e | h |
2021-01-01 00:00:00+00:00 | b | e | h |
I would like to pivot the category
column (which contains on the order of 50 different categories) and kind of deduplicate along the timestamp
and id
columns so the result looks like this
id | timestamp | d | e |
---|---|---|---|
a | 2021-01-01 00:00:00+00:00 | g | nan |
a | 2021-01-01 00:10:00+00:00 | h | h |
b | 2021-01-01 00:00:00+00:00 | nan | h |
I know how I would achieve this in pandas
using multiindices together with the stack
/unstack
operations, however my dataset is way too large to use pandas
without manual batch processing and dask
does not support multiindices. Is there some way this can be efficiently done with dask
?
Edit:
As noted by @Dahn, I've created a minimal synthetic example with pandas:
import pandas as pd
records = [
{'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
{'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
{'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
{'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
{'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
{'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]
frame = pd.DataFrame(records)
idx id category value
0 0 a d 1
1 1 a e 2
2 2 a f 3
3 0 b d 4
4 1 c e 5
5 2 c f 6
frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
id idx d e f
0 a 0 1.0 NaN NaN
1 a 1 NaN 2.0 NaN
2 a 2 NaN NaN 3.0
3 b 0 4.0 NaN NaN
4 c 1 NaN 5.0 NaN
5 c 2 NaN NaN 6.0
I don't believe Dask implements this as of October 2021. This is likely because there's no support for multi-index, which unstack
requires. There has been some work on this recently though.
However, I think this still should be possible using the apply-concat-apply paradigm (and apply_concat_apply
function).
The solution below works for the example you've provided and in principle, I think, it should work generally, but I am not sure. Please proceed with caution and, if possible, check that results agree with what Pandas gives you. I have also posted this as a feature request on Dask's github itself.
import dask.dataframe as dd
# Create Dask DataFrame out of your `frame`
# npartitions is more than 1 to demonstrate this works on a partitioned datataset
df = dd.from_pandas(frame, npartitions=3)
# Dask needs to know work out what the categories are
# Alternatively you can use df.categorize
# See https://docs.dask.org/en/latest/dataframe-categoricals.html
category = 'category'
df[category] = df[category].astype(category).cat.as_known()
# Dask needs to know what the resulting DataFrame looks like
new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
# Implement using apply_concat_apply ("aca")
# More details: https://blog.dask.org/2019/10/08/df-groupby
def identity(x): return x
def my_unstack(x):
return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
def combine(x):
return x.groupby(level=[0, 1]).sum()
result = dd.core.apply_concat_apply([df],
chunk=identity,
aggregate=my_unstack,
combine=combine,
meta=meta)
result.compute()
map_partitions
If you are already able to sort the data according to at least one of idx
or id
, then you could also simply use map_partitions
and treat each partition as a Pandas dataframe.
This should lead to significant improvement in memory usage and performance overall.
# df has sorted index `idx` in this scenario
category = 'category'
existing_categories = df[category].astype(category).cat.as_known().cat.categories
categories = [('value', cat) for cat in existing_categories]
new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
def unstack_add_columns(x):
x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
# make sure that result contains all necessary columns
return x.reindex(columns=new_columns)
df.map_partitions(unstack_add_columns, meta=meta)
If you cannot guarantee idx will be sorted, you could possibly try something like
df_sorted = df.set_index('idx')
# I recommend saving to disk in between set_index and the rest
df_sorted.to_parquet('data-sorted.parq')
but that might itself bring problems with memory.