Search code examples
pythondaskparquetdask-distributeddask-dataframe

map_partitions runs twice when storing dask dataframe in parquet and records are counted


I have a dask process that runs a function on each dataframe partition. I let to_parquet do the compute() that runs the functions.

But I also need to know the number of records in the parquet table. For that, I use ddf.map_partitions(len). Problem is that when I count the number of records, a compute() is done again on the dataframe, and that makes the map_partitions functions run again.

What should be the approach to run map_partitions, save the result in parquet, and count the number of records?

def some_func(df):
    df['abc'] = df['def'] * 10
    return df

client = Client('127.0.0.1:8786')

ddf.map_partitions(some_func) # some_func executes twice for each partition

ddf.to_parquet('/some/folder/data', engine='pyarrow') 

total = ddf.map_partitions(len).compute().sum() 

Solution

  • One potential problem is line:

    ddf.map_partitions(some_func)
    

    Here, the instruction to dask is to map partitions, but there is no instruction to store the results of this operation. Hence, the code should probably be:

    # this will store the modified dataframe as ddf
    ddf = ddf.map_partitions(some_func)
    

    Next, running .to_parquet will evaluate (compute) the dataframe ddf and discard it from memory, so the subsequent .compute will re-compute the dataframe.

    This can be expensive, so some of the possible solutions are:

    1. if ddf can fit into memory of workers, then the ddf could be persisted to avoid recomputing:
    ddf = ddf.map_partitions(some_func)
    ddf = ddf.persist()
    ddf.to_parquet('/some/folder/data', engine='pyarrow') 
    total = ddf.map_partitions(len).compute().sum()
    
    1. re-defining ddf in terms of computed data. Here, dask will use the stored parquet files (that were computed) and load information from them:
    ddf = ddf.map_partitions(some_func)
    ddf = ddf.persist()
    ddf.to_parquet('/some/folder/data', engine='pyarrow')
    
    # create a new ddf based on the computed values
    ddf = dd.read_parquet('/some/folder/data')
    total = ddf.map_partitions(len).compute().sum()
    
    1. another solution could be to modify some_func to store the results on the fly and return len. The rough pseudocode is:
    path_out = 'some/template_path/for/parquet/{number}.parquet'
    
    def some_func(df, partition_info=None):
        df['abc'] = df['def'] * 10
        # this uses partition number to create an appropriate path
        path_save_parquet = path_out.format(number=partition_info['number'])
        df.to_parquet(path_save_parquet)
        return len(df)
    
    # this will compute total length and save the computation in the process
    total = ddf.map_partitions(some_func).compute().sum()