Search code examples
pythonpandasdataframeloopsiterator

How to calculate the Relative Strength Index (RSI) through record iterations in pandas dataframe


I have created a pandas dataframe as follows:

import pandas as pd
import numpy as np
    
ds = { 'trend' : [1,1,1,1,2,2,3,3,3,3,3,3,4,4,4,4,4], 'price' : [23,43,56,21,43,55,54,32,9,12,11,12,23,3,2,1,1]}

df = pd.DataFrame(data=ds)

The dataframe looks as follows:

display(df)

   trend    price
0   1        23
1   1        43
2   1        56
3   1        21
4   2        43
5   2        55
6   3        54
7   3        32 
8   3         9
9   3        12
10  3        11
11  3        12
12  4        23
13  4         3
14  4         2
15  4         1
16  4         1

I have saved the dataframe to a .csv file called df.csv:

df.to_csv("df.csv", index = False)

I have then created a function that calculates the Relative Strength Index (RSI - see: https://www.investopedia.com/terms/r/rsi.asp):

def get_RSI(df, column, time_window):
    """Return the RSI indicator for the specified time window."""
    diff = df[column].diff(1)

    # This preservers dimensions off diff values.
    up_chg = 0 * diff
    down_chg = 0 * diff

    # Up change is equal to the positive difference, otherwise equal to zero.
    up_chg[diff > 0] = diff[diff > 0]

    # Down change is equal to negative deifference, otherwise equal to zero.
    down_chg[diff < 0] = diff[diff < 0]

    # We set com = time_window-1 so we get decay alpha=1/time_window.
    up_chg_avg = up_chg.ewm(com=time_window - 1,
                            min_periods=time_window).mean()
    down_chg_avg = down_chg.ewm(com=time_window - 1,
                                min_periods=time_window).mean()

    RS = abs(up_chg_avg / down_chg_avg)
    df['RSI'] = 100 - 100 / (1 + RS)
    df = df[['RSI']]
    return df

I need to create a new field called RSI which:

  1. iterates through each and every record of the dataframe
  2. calculates the RSI by considering the price observed at each iteration and the last prices (RSI length is 3 in this example) observed in the previous trends.

For example:

  • I iterate at record 0 and the RSI is NaN (missing).
  • I iterate at record 1 and the RSI is still NaN (missing)
  • I Iterate at record 12 and the RSI is 47.667343 (it considers price at record 3, price at record 5 and price at record 12
  • I Iterate at record 13 and the RSI is 28.631579 (it considers price at record 3, price at record 5 and price at record 13
  • I Iterate at record 15 and the RSI is 27.586207 (it considers price at record 3, price at record 5 and price at record 15
  • and so on .....

I have then written this code:

rsi = []

for i in range(len(df)):

    ds = pd.read_csv("df.csv", nrows=i+1)
    print(ds.info())
    d = ds.groupby(['trend'], as_index=False).agg(
                                                    {'price':'last'})

    get_RSI(d,'price',3)
    rsi.append(d['RSI'].iloc[-1])

df['RSI'] = rsi

The dataset looks correct:

display(df)

  trend price   RSI
0   1   23     NaN
1   1   43     NaN
2   1   56     NaN
3   1   21     NaN
4   2   43     NaN
5   2   55     NaN
6   3   54     NaN
7   3   32     NaN
8   3   9      NaN
9   3   12     NaN
10  3   11     NaN
11  3   12     NaN
12  4   23     47.667343
13  4   3      28.631579
14  4   2      28.099174
15  4   1      27.586207
16  4   1      27.586207

Th problem is that I need to process about 4 million records and it would take approximately 60 hours.

Does anyone know how to get the same results in a quick, efficient way, please?


Solution

  • It wasn't completely clear in the question what the analysis was trying to achieve - there is a high likelihood that there's a more efficient method if this was understood.

    Using the below approach I managed to get the time down from 5 hours to 35 minutes on the simulated dataset, whilst still using the same methodology.

    I simulated your 4m dataset as follows:-

    import random
    from tqdm import tqdm
    
    random.seed(1337)
    trends = [x for x in range(1000)]
    
    ds = {
        # sorted as I assume the 'trend' is the time indicator.
        "trend": sorted(
            [trends[int((random.random() * 1000))] for _ in tqdm(range(int(4e6)))]
        ),
        "price": [random.random() * 100 for _ in tqdm(range(int(4e6)))],
    }
    
    df = pd.DataFrame(data=ds)
    df.to_csv(
        "fake_data.csv", index=True
    )  # reccommend keeping index to keep track of the time series
    

    Before iterating over the df, I created a list of cartesian products (list of lists), where each cartesian product is the slice of the original df for which you are calculating the RSI. We can then iterate over this list to retrieve each required df slice.

    import pandas as pd
    from copy import deepcopy
    from tqdm import tqdm
    
    df = pd.read_csv("./fake_data.csv")
    time_window = 3
    
    
    def get_trend_ranges(df: pd.DataFrame, trends: list) -> dict[int, tuple]:
        """
        trends - list of unique trend values
        returns - start and end index of each range (assume df sorted by range)
        """
        trends_ranges = {}
        for trend in trends:
            df_trend = df[df["trend"] == trend]
            start_idx = df_trend.iloc[0].name
            end_idx = df_trend.iloc[-1].name
            trends_ranges[trend] = (start_idx, end_idx)
        return trends_ranges
    
    
    def _get_idx_slices(trends: list, trends_ranges: dict, time_window) -> list[list]:
        """
        trends - list of unique trend values
        trends_ranges - {trend: (start_index, end idx)}
        returns [[idx1, idx2, ..,idxt+1], [idx11, idx22, ..., idx2t+1],..] where t is time_window
        """
        if time_window >= len(trends) or time_window <= 0:
            raise Exception("Time Window not within bounds")
        else:
            unique_trend_analyses = len(trends) - time_window
            idx_lst = []
            for i in tqdm(range(unique_trend_analyses)):
                fixed_t = trends[i : i + time_window]
                fixed_idx = [trends_ranges[trend][1] for trend in fixed_t]
                var_t = trends[i + time_window]
                var_t_start_idx, var_t_stop_idx = (
                    trends_ranges[var_t][0],
                    trends_ranges[var_t][1] + 1,
                )
                for j in range(var_t_start_idx, var_t_stop_idx):
                    full_idx = deepcopy(fixed_idx)
                    full_idx.append(j)
                    idx_lst.append(full_idx)
        return idx_lst
    
    
    # in order to store ranges of each trend, we assume trend is sorted.
    def get_idx_slices(df, time_window):
        trends = df["trend"].unique().tolist()
        trends_ranges = get_trend_ranges(df, trends)
        idx_slices = _get_idx_slices(trends, trends_ranges, time_window)
        return idx_slices
    

    Now we can iterate over the idx list to use the df slices that we require.

    def get_RSI(df, time_window):
        """Return the RSI indicator for the specified time window."""
    
        up_chg = df.apply(lambda x: x if x > 0 else 0)
        down_chg = df.apply(lambda x: x if x < 0 else 0)
    
        # We set com = time_window-1 so we get decay alpha=1/time_window.
        up_chg_avg = up_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
        down_chg_avg = down_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
    
        RS = abs(up_chg_avg / down_chg_avg)
        RSI = 100 - 100 / (1 + RS)
        return RSI.iloc[-1]
    
    
    df["RSI"] = ""
    
    idx_lst = get_idx_slices(df, time_window)
    
    for idx in tqdm(idx_lst):
        df_slice = df.iloc[idx]
        df_slice_diff = df_slice["price"].diff()
    
        RSI = get_RSI(df_slice_diff, time_window)
        last_idx = idx[len(idx) - 1]
    
        df.loc[last_idx, "RSI"] = RSI