One of the recent problems with dask I encountered was encodings that take a lot of time and I wanted to speed them up.
Problem: given a dask df (ddf), encode it, and return ddf.
Here is some code to start with:
# !pip install feature_engine
import dask.dataframe as dd
import pandas as pd
import numpy as np
from feature_engine.encoding import CountFrequencyEncoder
df = pd.DataFrame(np.random.randint(1, 5, (100,3)), columns=['a', 'b', 'c'])
# make it object cols
for col in df.columns:
df[col] = df[col].astype(str)
ddf = dd.from_pandas(df, npartitions=3)
x_freq = ddf.copy()
for col_idx, col_name in enumerate(x_freq.columns):
freq_enc = CountFrequencyEncoder(encoding_method='frequency')
col_to_encode = x_freq[col_name].to_frame().compute()
encoded_col = freq_enc.fit_transform(col_to_encode).rename(columns={col_name: col_name + '_freq'})
x_freq = dd.concat([x_freq, encoded_col], axis=1)
x_freq.head()
It will run fine as I would expect, adding pandas df to dask df - no problem. But when I try another ddf, there is an error:
x_freq = x.copy()
# npartitions = x_freq.npartitions
# x_freq = x_freq.repartition(npartitions=npartitions).reset_index(drop=True)
for col_idx, col_name in enumerate(x_freq.columns):
freq_enc = CountFrequencyEncoder(encoding_method='frequency')
col_to_encode = x_freq[col_name].to_frame().compute()
encoded_col = freq_enc.fit_transform(col_to_encode).rename(columns={col_name: col_name + '_freq'})
x_freq = dd.concat([x_freq, encoded_col], axis=1)
break
x_freq.head()
Error is happening during concat:
ValueError: Unable to concatenate DataFrame with unknown division specifying axis=1
This is how I load "error" ddf:
ddf = dd.read_parquet(os.path.join(dir_list[0], '*.parquet'), engine='pyarrow').repartition(partition_size='100MB')
I read I should try repartition and/or reset index and/or use assign. Neither worked.
x_freq = x.copy()
in the second example is similar to:
x_freq = ddf.copy()
in the first example in a sense that x is just some ddf I'm trying to encode but it would be a lot of code to define it here.
Can anyone help, please?
Here's what I think might be going on.
Your parquet file probably doesn't have divisions information within it. You thus cannot just dd.concat
, since it's not clear how the partitions align.
You can check this by
x_freq.known_divisions # is likely False
x_freq.divisions # is likely (None, None, None, None)
Since unknown divisions are the problem, you can re-create the issue by using the synthetic data in the first example
x_freq = ddf.clear_divisions().copy()
You might solve this problem by re-setting the index:
x_freq.reset_index().set_index(index_column_name)
where index_column_name
is the name of the index column.
Consider also saving the data with the correct index afterwards so that it doesn't have to be calculated each time.
By the way, since you're computing each column before working with it, you're not really utilizing dask's parallelization abilities. Here is a workflow that might utilize parallelization a bit better:
def count_frequency_encoder(s):
return s.replace(s.value_counts(normalize=True).compute().to_dict())
frequency_columns = {
f'{col_name}_freq': count_frequency_encoder(x_freq[col_name])
for col_name in x_freq.columns}
x_freq = x_freq.assign(**frequency_columns)
to_frame
A tiny tip:
x_freq[col_name].to_frame()
is equivalent to
x_freq[[col_name]]