Search code examples
pythonparallel-processingmultiprocessingdaskdask-dataframe

Dask dataframe running parallel and partitioned by columns


I have a dataframe with multiple companies and countries' data that I am trying to transform in parallel using a function. The data takes a format like this but is much larger and with many more clients:

enter image description here

I have made a dask dataframe that combines the companygroupname and country to make a blended column that I am trying to use as an index for partitions.

I don't want to use npartitions > 1 when making the dask dataframe as that can randomly put half of my client's data in one partition and then the other half in another partition (as far as I can tell)

Here is what I have tried, my question is how can I properly split up my dataframe into specified partitions in order to process each one of them in parallel through my transformation function?

Many thanks in advance for any help

def calculate_predictions(df):

    df['CompanyCountryCombined'] = df['CompanyGroupName'].astype(str) + '-' + df['Country']

    # Convert pandas DataFrame to Dask DataFrame with one partition
    ddf = dd.from_pandas(df, npartitions=1)
    ddf = ddf.set_index('CompanyCountryCombined', sorted=True).repartition(partition_size="10MB")  # Here I am trying to repartition the data across the combined index
    list_of_output_columns = [] # defined elsewhere

    # Define meta DataFrame to specify the output format
    meta = pd.DataFrame(columns=[list_of_output_columns])

    ddf = ddf.sort_values(by=['CompanyCountryCombined','Date'])

    models = {} # defined elsewhere too

    # Processing the partitions through the process_forecast_group function which is also defined elsewhere
    final_results = ddf.groupby('CompanyCountryCombined').apply(
        lambda partition: process_forecast_group(partition, models), 
        meta=meta
    ).compute()

    return final_results

Solution

  • I ended up solving this by taking a unique list of the desired partitions and then repartitioning over those divisions:

    import dask.dataframe as dd
    import pandas as pd
    
    def calculate_predictions(df):
    
        def create_dask_df(pandas_df, columns_to_combine=list):
            # Creates a column that will be used to partition the data based on desired column list
            pandas_df['combined'] = pandas_df[columns_to_combine].astype(str).agg('-'.join, axis=1)
            # Dask requires the data to be partitioned, we set it to 1 initially to create the ddf with all the data in one dataframe
            ddf = dd.from_pandas(df, npartitions=1)
            # Sets the index of the dask dataframe to the combined column 
            ddf = ddf.set_index('combined', sorted=True)
            # Gets the list of unique combined values into a list, ready to repartition by these values
            list_of_uniques = sorted(df['combined'].unique().tolist())
            # The range of dask repartitioning cuts the list short by 1 so we need to increase it intentionally with a dupe
            division_list = list_of_uniques + [list_of_uniques[-1]]
            # Repartitions the dask dataframe based on the list of unique combined column
            ddf = ddf.repartition(divisions=division_list)
            return ddf
        
        columns_to_partition_over = ['CompanyGroupName', 'Country']
        ddf = create_dask_df(df, columns_to_partition_over)
    
        list_of_output_columns = [] # defined elsewhere
        meta = pd.DataFrame(columns=[list_of_output_columns]) # Define meta DataFrame to specify the output format
        models = {} # defined elsewhere too
    
        # Processing the partitions through the process_forecast_group function which is also defined elsewhere
        final_results = ddf.map_partitions(
            lambda partition: process_forecast_group(partition.sort_values('Date'), models), 
            meta=meta
        ).compute()
    
        return final_results