pandasperformanceoptimizationsparse-matrixnumba

Optimize Pandas Dataframe concat method for very large sparse data


Given:

import pandas as pd
import numpy as np

def get_rnd_df(row:int=10, col:int=7): # generate random Sparse Pandas dataframe
    np.random.seed(0)
    d=np.random.randint(low=0, high=10, size=(row,col)).astype(np.float32)
    d[d < 3] = np.nan
    df=pd.DataFrame(data=d,
                    index=[f"ip{i}" for i in np.random.choice(range(max(row, 10)), row, replace=False) ],
                    columns=[f"col_{c}" for c in np.random.choice(range(max(col, 10)), col, replace=False) ],
                    dtype=pd.SparseDtype(dtype=np.float32), # sparse: memory efficient xxx but SUPER SLOW xxx
                )
    df.index.name='usr'
    return df
    
def get_df_concat(dfs):
    t=time.time()
    dfc=pd.concat(dfs, axis=0, sort=True) # vstack dfs=[df1, df2,..., dfN], sort=True: sort columns
    print(f"elapsed_time [concat]{time.time()-t:>{12}.{4}f} sec")

    t=time.time()
    dfc=dfc.groupby(level=0) # groupby index
    print(f"elapsed_time [groupby]{time.time()-t:>{11}.{4}f} sec")

    t=time.time()
    dfc=dfc.sum() # <<<========== Time Consuming ==========>>> 
    print(f"elapsed_time [sum]{time.time()-t:>{15}.{4}f} sec")

    t=time.time()
    dfc=dfc.sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) ))
    print(f"elapsed_time [sort idx]{time.time()-t:>{10}.{4}f} sec")

    return dfc

I apply concatenation using my helper function get_df_concat(dfs) for several pandas dataframe [Sparse] in which columns and indices are sorted. For two small sample pandas dataframe, it works reasonably fine as intended:

df1=get_rnd_df(row=8, col=7) # small sample random dataframe
        col_0   col_6   col_4   col_2   col_5   col_3   col_8
usr                             
ip3     5.0     NaN     3.0     3.0     7.0     9.0     3.0
ip5     5.0     NaN     4.0     7.0     6.0     8.0     8.0
ip0     NaN     6.0     7.0     7.0     8.0     NaN     5.0
ip6     9.0     8.0     9.0     4.0     3.0     NaN     3.0
ip2     5.0     NaN     NaN     3.0     8.0     NaN     3.0
ip8     3.0     3.0     7.0     NaN     NaN     9.0     9.0
ip9     NaN     4.0     7.0     3.0     NaN     7.0     NaN
ip7     NaN     NaN     4.0     5.0     5.0     6.0     8.0

df2=get_rnd_df(row=5, col=11) # small sample random dataframe
        col_4   col_10  col_5   col_0   col_9   col_2   col_8   col_6   col_3   col_7   col_1
usr                                             
ip3     5.0     NaN     3.0     3.0     7.0     9.0     3.0     5.0     NaN     4.0     7.0
ip5     6.0     8.0     8.0     NaN     6.0     7.0     7.0     8.0     NaN     5.0     9.0
ip0     8.0     9.0     4.0     3.0     NaN     3.0     5.0     NaN     NaN     3.0     8.0
ip6     NaN     3.0     3.0     3.0     7.0     NaN     NaN     9.0     9.0     NaN     4.0
ip2     7.0     3.0     NaN     7.0     NaN     NaN     NaN     4.0     5.0     5.0     6.0

%%time
df_concat=get_df_concat(dfs=[df1, df2])
        col_0   col_1   col_2   col_3   col_4   col_5   col_6   col_7   col_8   col_9   col_10
usr                                             
ip0     3.0     8.0     10.0    0.0     15.0    12.0    6.0     3.0     10.0    0.0     9.0
ip2     12.0    6.0     3.0     5.0     7.0     8.0     4.0     5.0     3.0     0.0     3.0
ip3     8.0     7.0     12.0    9.0     8.0     10.0    5.0     4.0     6.0     7.0     0.0
ip5     5.0     9.0     14.0    8.0     10.0    14.0    8.0     5.0     15.0    6.0     8.0
ip6     12.0    4.0     4.0     9.0     9.0     6.0     17.0    0.0     3.0     7.0     3.0
ip7     0.0     0.0     5.0     6.0     4.0     5.0     0.0     0.0     8.0     0.0     0.0
ip8     3.0     0.0     0.0     9.0     7.0     0.0     3.0     0.0     9.0     0.0     0.0
ip9     0.0     0.0     3.0     7.0     7.0     0.0     4.0     0.0     0.0     0.0     0.0

The problem is that my real data is so large that it takes many hours to accomplish concatenation sum() method specifically, given sufficient available memory:

df1=get_rnd_df(row=int(7e+5), col=int(2e+8)) # resembles my real data
df2=get_rnd_df(row=int(9e+6), col=int(1e+9)) # resembles my real data

%%time
df_concat=get_df_concat(dfs=[df1, df2]) # SUPER SLOW & time-consuming!!!

Is there some better alternative to accomplish such concatenation more efficiently? I wonder if there could be SciPy csr_matrix to help me out for faster implementation?

PS. I take advantage of pd.SparseDtype("float32", fill_value=np.nan) to enure it fits in my available memory.

UPDATE

Thanks to @RomanPerekhrest, I now optimize the concatenation, specifically the sum() method using numba engine in Pandas:

def get_df_concat_optimized(dfs):
    dfc=pd.concat(dfs, axis=0, sort=True).astype(pd.SparseDtype(dtype=np.float32)) # dfs=[df1, df2,..., dfN], sort=True: sort columns
    dfc=dfc.groupby(level=0) # groupby index
    dfc=dfc.sum(engine="numba", # <<=== saves time using NUMBA engine!
                engine_kwargs={'nopython': True, 'parallel': True, 'nogil': False},
                ).astype(pd.SparseDtype(dtype=np.float32, fill_value=0.0,))
    dfc=dfc.sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) )).astype(pd.SparseDtype(dtype=np.float32, fill_value=0.0))
    return dfc

and here is the time comparison for some fairly large random sparse dataframes using my inefficient get_df_concat(dfs) approach and the optimized `get_df_concat_optimized(dfs) approach:

df1=get_rnd_df(row=int(6e2), col=int(9e2))
df2=get_rnd_df(row=int(2e2), col=int(7e2))

%%time
df_concat_opt=get_df_concat_optimized(dfs=[df1, df2])
CPU times: user 2.32 s, sys: 11.6 ms, total: 2.33 s
Wall time: 2.47 s

%%time
df_concat=get_df_concat(dfs=[df1, df2])
elapsed_time [concat]      0.2443 sec
elapsed_time [groupby]     0.0008 sec
elapsed_time [sum]        67.2486 sec <<< time consuming >>>
elapsed_time [sort idx]    0.2136 sec
CPU times: user 1min 6s, sys: 721 ms, total: 1min 7s
Wall time: 1min 7s

Solution

  • Optimization points:

    • a small one: prefer numpy.random.Generator, it's recommended, and in general, I noticed that it works a bit faster than np.random.<func>
    • operate on numerical index/columns labels. Then, add the needed prefixes at the end at once (vectorizably). That will speed up the processing in general.
    • recalling the previous point, eliminate custom sorting of columns, as they can be sorted right on concatenation phase with setting sort param: pd.concat(dfs, axis=0, sort=True)
    • eliminate index sorting with sort_index, as pd.DataFrame.groupby will sort group keys by default
    • boost performance of groupby.DataFrameGroupBy.sum by selecting numba (as engine) for JIT compiled code with parallel computations ({'nopython': True, 'parallel': False})

    I tested on 2 dataframes each of size (row=int(1e+3), col=int(2e+3)), the former/initial solution finished in 144 seconds on my machine.
    Whereas the optimized approach ran in about 8 sec (as Numba will have some function compilation overhead). But all subsequent calls are running in 2.6 seconds, that's 55X speedup.

    The full optimized version:

    def get_rnd_df(row=10, col=7):
        rng = np.random.default_rng(0)
        a = rng.integers(low=0, high=10, size=(row, col)).astype("float32")
        a[a < 3] = np.nan
    
        df = pd.DataFrame(data=a,
                          index=rng.choice(range(max(row, 10)), row, replace=False),
                          columns=rng.choice(range(max(col, 10)), col, replace=False),
                          dtype=pd.SparseDtype("float32", fill_value=np.nan),
                          )
    
        df.index.name = 'usr'
        return df
    
    def get_df_concat(dfs):
        df = pd.concat(dfs, axis=0, sort=True)  # dfs=[df1, df2, df3, ..., dfN]
        df = df.groupby(level=0).sum(engine="numba",
                                     engine_kwargs={'nopython': True,
                                                    'parallel': True})
    
        df.index = 'ip' + df.index.astype(str)
        df.columns = 'col_' + df.columns.astype(str)
    
        return df
    
    t0 = time.time()
    
    df1=get_rnd_df(row=int(1e+3), col=int(2e+3)) # resembles my real data
    df2=get_rnd_df(row=int(1e+3), col=int(2e+3)) # resembles my real data
    df_concat=get_df_concat(dfs=[df1, df2])
    
    print('time spent: ', time.time() - t0)
    

    time spent:  2.636758327484131