Search code examples
pythonpandasnumpyaggregation

Aggregate columns that fall within range


I have two dataframes called df and ranges:

data = {
    'group': ['A', 'B', 'A', 'C', 'B'],
    'start': [10, 20, 15, 30, 25],
    'end': [50, 40, 60, 70, 45],
    'val1': [5, 10, 11, 12, 6],
    'val2': [5, 2, 1, 1, 0],
}


df = pd.DataFrame(data)
data = {
    'group': ['A', 'B', 'C'],
    'start': [0, 5, 25],
    'end': [50, 7, 35],
}


ranges = pd.DataFrame(data)

My goal is to aggregate the rows in df together based on whether they fall within the same range defined in ranges. I would like to aggregate them together such that for each val1, val2 column I get the min, max, mean, sum of that column within the context of the aggregation group.

The catch here is that I need to do this for something like 5000 ranges in ranges and 500,000 rows in df. So I'd like a fast but memory efficient (relatively) solution. I'm open to solutions using similar frameworks such as vaex.

Expected output where range_id is just a way to identify groups assuming they're not unique:

  range_id val1              val2             
            min max mean sum  min max mean sum
0        0    5   5  5.0   5    5   5  5.0   5

Solution

  • IIUC, I would pre-filter the dataframe with map and boolean indexing, then perform a classical groupby.agg. This should keep the masks and intermediate (filtered) DataFrame minimal for memory efficiency, and minimize the size of the input for groupby.

    # columns to aggregate
    cols = ['val1', 'val2']
    
    # ensure data is numeric
    df[cols] = df[cols].astype(int)
    
    # optional, just to avoid having to `set_index` twice
    tmp = ranges.set_index('group')
    
    # pre-filter the rows for memory efficiency
    # then perform a groupby.agg
    out = (df[( df['start'].ge(df['group'].map(tmp['start']))
               &df['end'].le(df['group'].map(tmp['end'])))]
           .groupby('group', as_index=False)[cols].agg(['min', 'max', 'mean', 'sum'])
          )
    

    Output:

      group val1              val2             
             min max mean sum  min max mean sum
    0     A    5   5  5.0   5    5   5  5.0   5
    

    Intermediate before the groupby:

      group  start  end  val1  val2
    0     A     10   50     5     5
    

    variant

    @sammywemmy proposed an variation of my solution. Instead of computing all aggregations simultaneously in groupby.agg, you could compute them individually and combine them with concat. This is faster and potentially a bit more efficient memory-wise.

    from itertools import product
    
    cols = ['val1', 'val2']
    tmp = ranges.set_index('group')
    
    grouped = (df[( df['start'].ge(df['group'].map(tmp['start']))
                   &df['end'].le(df['group'].map(tmp['end'])))]
              ).groupby('group')
    
    aggs = ['min','mean','max','sum']
    
    bunch = product(cols, aggs)
    contents = []
    for col, _agg in bunch:
        outcome = grouped[col].agg(_agg)
        outcome.name = (col,_agg)
        contents.append(outcome)
    
    out = pd.concat(contents,axis=1)
    

    timings

    merge range timing comparison

    example generating function

    def init(N):
        df = pd.DataFrame({'group': np.random.randint(0, 20, N),
                           'start': np.random.randint(0, 100, N),
                           'end': np.random.randint(0, 100, N),
                           'val1': np.random.randint(0, 100, N),
                           'val2': np.random.randint(0, 100, N),
                          })
        
        # ensure start <= end
        df[['start', 'end']] = np.sort(df[['start', 'end']], axis=1)
    
        group = df['group'].unique()
        ranges = pd.DataFrame({'group': group,
                               'start': np.random.randint(0, 110, len(group)),
                               'end': np.random.randint(0, 110, len(group)),
                              })
        ranges[['start', 'end']] = np.sort(ranges[['start', 'end']], axis=1)
        
        return df, ranges
    

    memory_usage

    on 10M rows

    # mozway_pre_filter
    Line #    Mem usage    Increment  Occurrences   Line Contents
    =============================================================
        49    711.2 MiB    711.2 MiB           1   def mozway_pre_filter(df, ranges):
        50                                             # columns to aggregate
        51    711.2 MiB      0.0 MiB           1       cols = ['val1', 'val2']
        52                                             
        53                                             # ensure data is numeric
        54    863.9 MiB    152.6 MiB           1       df[cols] = df[cols].astype(int)
        55                                             
        56                                             # optional, just to avoid having to `set_index` twice
        57    863.9 MiB      0.0 MiB           1       tmp = ranges.set_index('group')
        58                                             
        59                                             # pre-filter the rows for memory efficiency
        60                                             # then perform a groupby.agg
        61    950.9 MiB     11.2 MiB           4       return (df[( df['start'].ge(df['group'].map(tmp['start']))
        62    881.6 MiB      9.5 MiB           1                  &df['end'].le(df['group'].map(tmp['end'])))]
        63    950.7 MiB    -66.6 MiB           2              .groupby('group', as_index=False)[cols].agg(['min', 'max', 'mean', 'sum'])
    
    
    # donkey_merge
    Line #    Mem usage    Increment  Occurrences   Line Contents
    =============================================================
        66    884.4 MiB    884.4 MiB           1   def donkey_merge(df, ranges):
        67    884.4 MiB      0.0 MiB           1       ranges = ranges.assign(range_id=ranges.index)
        68   1484.4 MiB    600.1 MiB           1       df_merged = pd.merge(df, ranges, on='group')
        69   1602.8 MiB    109.0 MiB           2       df_filtered = df_merged[(df_merged['start_x'] >= df_merged['start_y'])
        70   1494.0 MiB      9.4 MiB           1                               & (df_merged['end_x'] <= df_merged['end_y'])]
        71   1602.8 MiB      0.0 MiB           2       aggregation_dict = {"val1": ['min', 'max', 'mean', 'sum'],
        72   1602.8 MiB      0.0 MiB           1                           "val2": ['min', 'max', 'mean', 'sum']}
        73   1585.3 MiB    -17.6 MiB           1       return df_filtered.groupby('range_id').agg(aggregation_dict).reset_index()
    
    
    # Nayem_aggregate_range
    Line #    Mem usage    Increment  Occurrences   Line Contents
    =============================================================
        19    905.9 MiB    905.9 MiB           1   def Nayem_aggregate_range(df, ranges):
        20    905.9 MiB      0.0 MiB           1       results = []
        21    961.5 MiB      0.0 MiB          21       for idx, row in ranges.iterrows():
        22    961.5 MiB      0.0 MiB          20           mask = (
        23    961.5 MiB     55.6 MiB          60               (df['group'] == row['group']) &
        24    961.5 MiB      0.0 MiB          20               (df['start'] >= row['start']) &
        25    961.5 MiB      0.0 MiB          20               (df['end'] <= row['end'])
        26                                                 )
        27    961.5 MiB      0.0 MiB          20           filtered_df = df[mask]
        28    961.5 MiB      0.0 MiB          20           if not filtered_df.empty:
        29    961.5 MiB      0.0 MiB          20               agg_dict = {
        30    961.5 MiB      0.0 MiB          20                   'val1_min': filtered_df['val1'].min(),
        31    961.5 MiB      0.0 MiB          20                   'val1_max': filtered_df['val1'].max(),
        32    961.5 MiB      0.0 MiB          20                   'val1_mean': filtered_df['val1'].mean(),
        33    961.5 MiB      0.0 MiB          20                   'val1_sum': filtered_df['val1'].sum(),
        34    961.5 MiB      0.0 MiB          20                   'val2_min': filtered_df['val2'].min(),
        35    961.5 MiB      0.0 MiB          20                   'val2_max': filtered_df['val2'].max(),
        36    961.5 MiB      0.0 MiB          20                   'val2_mean': filtered_df['val2'].mean(),
        37    961.5 MiB      0.0 MiB          20                   'val2_sum': filtered_df['val2'].sum(),
        38                                                     }
        39    961.5 MiB      0.0 MiB          20               agg_dict['range_id'] = idx
        40    961.5 MiB      0.0 MiB          20               results.append(agg_dict)
        41    961.5 MiB      0.0 MiB           1       aggregated_df = pd.DataFrame(results)
        42    961.5 MiB      0.0 MiB           1       aggregated_df = aggregated_df.set_index('range_id')
        43    961.5 MiB      0.0 MiB           2       aggregated_df.columns = pd.MultiIndex.from_tuples(
        44    961.5 MiB      0.0 MiB           1           [('val1', 'min'), ('val1', 'max'), ('val1', 'mean'), ('val1', 'sum'),
        45                                                  ('val2', 'min'), ('val2', 'max'), ('val2', 'mean'), ('val2', 'sum')]
        46                                             )
        47    961.5 MiB      0.0 MiB           1       return aggregated_df
    
    
    # user24714692_merge_agg_
    Line #    Mem usage    Increment  Occurrences   Line Contents
    =============================================================
         3    879.9 MiB    879.9 MiB           1   def user24714692_merge_agg_(df, ranges):
         4   1429.1 MiB    549.2 MiB           1       mdf = pd.merge(df, ranges, on='group', suffixes=('', '_range'))
         5                                         
         6   1527.7 MiB     70.3 MiB           2       fdf = mdf[
         7   1457.4 MiB     19.1 MiB           2           (mdf['start'] >= mdf['start_range']) &
         8   1438.3 MiB      9.2 MiB           1           (mdf['end'] <= mdf['end_range'])
         9                                             ]
        10                                         
        11   1527.9 MiB      0.3 MiB           3       res = fdf.groupby(['group', 'start_range', 'end_range']).agg({
        12   1527.7 MiB      0.0 MiB           1           'val1': ['min', 'max', 'mean', 'sum'],
        13   1527.7 MiB      0.0 MiB           1           'val2': ['min', 'max', 'mean', 'sum']
        14   1527.9 MiB      0.0 MiB           1       }).reset_index()
        15                                         
        16   1527.9 MiB      0.0 MiB          14       res.columns = ['_'.join(col).strip() if col[1] else col[0] for col in res.columns.values]
        17   1527.9 MiB      0.0 MiB           1       return res
    

    Maximum memory usage

    maximum memory usage