I have a dask process that runs a function on each dataframe partition. I let to_parquet
do the
compute()
that runs the functions.
But I also need to know the number of records in the parquet
table. For that, I use ddf.map_partitions(len)
. Problem is that when I count the number of records, a compute()
is done again on the dataframe, and that makes the map_partitions functions run again.
What should be the approach to run map_partitions, save the result in parquet, and count the number of records?
def some_func(df):
df['abc'] = df['def'] * 10
return df
client = Client('127.0.0.1:8786')
ddf.map_partitions(some_func) # some_func executes twice for each partition
ddf.to_parquet('/some/folder/data', engine='pyarrow')
total = ddf.map_partitions(len).compute().sum()
One potential problem is line:
ddf.map_partitions(some_func)
Here, the instruction to dask is to map partitions, but there is no instruction to store the results of this operation. Hence, the code should probably be:
# this will store the modified dataframe as ddf
ddf = ddf.map_partitions(some_func)
Next, running .to_parquet
will evaluate (compute) the dataframe ddf
and discard it from memory, so the subsequent .compute
will re-compute the dataframe.
This can be expensive, so some of the possible solutions are:
ddf
can fit into memory of workers, then the ddf
could be persisted to avoid recomputing:ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')
total = ddf.map_partitions(len).compute().sum()
ddf
in terms of computed data. Here, dask
will use the stored parquet files (that were computed) and load information from them:ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')
# create a new ddf based on the computed values
ddf = dd.read_parquet('/some/folder/data')
total = ddf.map_partitions(len).compute().sum()
some_func
to store the results on the fly and return len
. The rough pseudocode is:path_out = 'some/template_path/for/parquet/{number}.parquet'
def some_func(df, partition_info=None):
df['abc'] = df['def'] * 10
# this uses partition number to create an appropriate path
path_save_parquet = path_out.format(number=partition_info['number'])
df.to_parquet(path_save_parquet)
return len(df)
# this will compute total length and save the computation in the process
total = ddf.map_partitions(some_func).compute().sum()