Search code examples
pythonpandasdaskswifter

Increase performance of df.rolling(...).apply(...) for large dataframes


Execution time of this code is too long.

df.rolling(window=255).apply(myFunc)

My dataframes shape is (500, 10000).

                   0         1 ... 9999
2021-11-01  0.011111  0.054242 
2021-11-04  0.025244  0.003653 
2021-11-05  0.524521  0.099521 
2021-11-06  0.054241  0.138321 
...

I make the calculation for each date with the last 255 date values. myFunc looks like:

def myFunc(x):
   coefs = ...
   return np.sqrt(np.sum(x ** 2 * coefs))

I tried to use swifter but performances are the same :

import swifter
df.swifter.rolling(window=255).apply(myFunc)

I also tried with Dask, but I think I didn't understand it well because the performance are not much better:

import dask.dataframe as dd
ddf = dd.from_pandas(df)
ddf = ddf.rolling(window=255).apply(myFunc, raw=False)
ddf.execute()

I didn't manage to parallelize the execution with partitions. How can I use dask to improve performance ? I'm on Windows.


Solution

  • First, since you are using numpy functions, specify the parameter raw=True. Toy example:

    import pandas as pd
    import numpy as np
    
    def foo(x):
        coefs = 2
        return np.sqrt(np.sum(x ** 2 * coefs))    
    
    df = pd.DataFrame(np.random.random((500, 10000)))
    
    %%time
    res = df.rolling(250).apply(foo)
    
    Wall time: 359.3 s
    
    # with raw=True
    %%time
    res = df.rolling(250).apply(foo, raw=True)
    
    Wall time: 15.2 s
    
    

    You can also easily parallelize your calculations using the parallel-pandas library. Only two additional lines of code!

    # pip install parallel-pandas
    import pandas as pd
    import numpy as np
    from parallel_pandas import ParallelPandas
    
    #initialize parallel-pandas
    ParallelPandas.initialize(n_cpu=8, disable_pr_bar=True)
    
    def foo(x):
        coefs = 2
        return np.sqrt(np.sum(x ** 2 * coefs))    
    
    df = pd.DataFrame(np.random.random((500, 1000)))
    
    # p_apply - is parallel analogue of apply method
    %%time
    res = df.rolling(250).p_apply(foo, raw=True, executor='processes')
    
    Wall time: 2.2 s
    

    With engine='numba'

    %%time
    res = df.rolling(250).p_apply(foo, raw=True, executor='processes', engine='numba')
    
    Wall time: 1.2 s
    
    

    Total speedup is 359/1.2 ~ 300!