Search code examples
pythonpandasdaskswifterpandarallel

How to speed up (parallelize) a grouped row-wise rolling mean calculation?


I am calculating a grouped row-wise moving average on a large data set. However, the process takes a too long time on a single thread. How can I efficiently speed up the process?

Please find a reproducible example below:

dataframe = pd.DataFrame({'id': range(2),
                   'group_id': range(2),
                   'Date_1_F1': [1,2], 
                   'Date_2_F1': [2,4], 
                   'Date_3_F1': [3, 6],
                   'Date_4_F1': [4,8], 
                   'Date_1_F2': [2,11], 
                   'Date_2_F2': [6, 13], 
                   'Date_3-F2': [10, 15],
                   'Date_4_F2': [14, 17]})
dataframe

   id  group_id  Date_1_F1  ...  Date_2_F2  Date_3-F2  Date_4_F2
0   0         0          1  ...          6         10         14
1   1         1          2  ...         13         15         17

I have a function that returns the (row-wise) smoothed version of the dataset.

def smooth_ts(dataframe, ma_parameter = 2):

    dataframe = (dataframe
                 .set_index(["id", "group_id"])
                 .groupby(lambda x: x.split("_")[-1], axis = 1, group_keys=False)
                 .apply(lambda x: x.rolling(ma_parameter, axis = 1)
                        .mean()
                        .dropna(axis=1, how='all')))
    
    dataframe.reset_index(inplace = True)
    
    return dataframe

smoothed_df = smooth_ts(dataframe)

Thank you very much


Solution

  • You could (1) melt your data frame using pd.melt, (2) create your grouping variable, (3) sort and group it aggregated by rolling.mean(2). Then you can use df.pivot to display the required data. In this approach, there is an apply method that can be parallelized using swifter. Here is an example:

    import pandas as pd
    import numpy as np
    
    import swifter
    
    dataframe = pd.DataFrame({'id': range(2),
                       'group_id': range(2),
                       'Date_1_F1': [1,2], 
                       'Date_2_F1': [2,4], 
                       'Date_3_F1': [3, 6],
                       'Date_4_F1': [4,8], 
                       'Date_1_F2': [2,11], 
                       'Date_2_F2': [6, 13], 
                       'Date_3-F2': [10, 15],
                       'Date_4_F2': [14, 17]})
    
    df_melted = pd.melt(dataframe, id_vars=['id', 'group_id'])
    # Use next line if you want to parallelize the apply method 
    # df_melted['groups'] = df_melted['variable'].str.split('_').swifter.apply(lambda v: v[-1])
    df_melted['groups'] = df_melted['variable'].str.split('_').apply(lambda v: v[-1])
    
    df_melted = df_melted.sort_values(['id', 'group_id', 'groups'])
    
    df_tmp = df_melted.copy()
    df_tmp['rolling_val'] = df_tmp.groupby(['id', 'group_id', 'groups'])['value'].rolling(2).mean().values
    df_tmp.pivot(index=['id', 'group_id'], columns='variable', values='rolling_val').dropna(axis=1).reset_index().rename_axis(None, axis=1)
    

    If you want to stick to your approach, you can accelerate it using the Pool object from the multiprocessing library, which parallelizes the mapping of a function to an iterator.

    import pandas as pd
    import numpy as np
    from multiprocessing import Pool
    
    
    dataframe = pd.DataFrame({'id': range(2),
                       'group_id': range(2),
                       'Date_1_F1': [1,2], 
                       'Date_2_F1': [2,4], 
                       'Date_3_F1': [3, 6],
                       'Date_4_F1': [4,8], 
                       'Date_1_F2': [2,11], 
                       'Date_2_F2': [6, 13], 
                       'Date_3-F2': [10, 15],
                       'Date_4_F2': [14, 17]})
    dataframe
    
    def smooth_ts(dataframe, ma_parameter = 2):
    
        dataframe = (dataframe
                     .set_index(["id", "group_id"])
                     .groupby(lambda x: x.split("_")[-1], axis = 1, group_keys=False)
                     .apply(lambda x: x.rolling(ma_parameter, axis = 1)
                            .mean()
                            .dropna(axis=1, how='all')))
        
        dataframe.reset_index(inplace = True)
        
        return dataframe
    
    id_chunks = np.array_split(dataframe.id.unique(), 2) # 2 : number of splits => corresponds to number of chunks
    df_chunks = [dataframe[dataframe['id'].isin(i)] for i in id_chunks] # list containing chunked data frames
    with Pool(2) as p: dfs_chunks = p.map(smooth_ts, df_chunks) # applies function smooth_ts to list of data frames, use two processors as dfs_chunks only contain two data frames. For more chunks, number of processors can be increased
    pd.concat(dfs_chunks).reset_index(drop=True)