I have created a pandas dataframe as follows:
import pandas as pd
import numpy as np
ds = { 'trend' : [1,1,1,1,2,2,3,3,3,3,3,3,4,4,4,4,4], 'price' : [23,43,56,21,43,55,54,32,9,12,11,12,23,3,2,1,1]}
df = pd.DataFrame(data=ds)
The dataframe looks as follows:
display(df)
trend price
0 1 23
1 1 43
2 1 56
3 1 21
4 2 43
5 2 55
6 3 54
7 3 32
8 3 9
9 3 12
10 3 11
11 3 12
12 4 23
13 4 3
14 4 2
15 4 1
16 4 1
I have saved the dataframe to a .csv file called df.csv
:
df.to_csv("df.csv", index = False)
I have then created a function that calculates the Relative Strength Index (RSI - see: https://www.investopedia.com/terms/r/rsi.asp):
def get_RSI(df, column, time_window):
"""Return the RSI indicator for the specified time window."""
diff = df[column].diff(1)
# This preservers dimensions off diff values.
up_chg = 0 * diff
down_chg = 0 * diff
# Up change is equal to the positive difference, otherwise equal to zero.
up_chg[diff > 0] = diff[diff > 0]
# Down change is equal to negative deifference, otherwise equal to zero.
down_chg[diff < 0] = diff[diff < 0]
# We set com = time_window-1 so we get decay alpha=1/time_window.
up_chg_avg = up_chg.ewm(com=time_window - 1,
min_periods=time_window).mean()
down_chg_avg = down_chg.ewm(com=time_window - 1,
min_periods=time_window).mean()
RS = abs(up_chg_avg / down_chg_avg)
df['RSI'] = 100 - 100 / (1 + RS)
df = df[['RSI']]
return df
I need to create a new field called RSI
which:
price
observed at each iteration and the last
prices (RSI length is 3 in this example) observed in the previous trends.For example:
I have then written this code:
rsi = []
for i in range(len(df)):
ds = pd.read_csv("df.csv", nrows=i+1)
print(ds.info())
d = ds.groupby(['trend'], as_index=False).agg(
{'price':'last'})
get_RSI(d,'price',3)
rsi.append(d['RSI'].iloc[-1])
df['RSI'] = rsi
The dataset looks correct:
display(df)
trend price RSI
0 1 23 NaN
1 1 43 NaN
2 1 56 NaN
3 1 21 NaN
4 2 43 NaN
5 2 55 NaN
6 3 54 NaN
7 3 32 NaN
8 3 9 NaN
9 3 12 NaN
10 3 11 NaN
11 3 12 NaN
12 4 23 47.667343
13 4 3 28.631579
14 4 2 28.099174
15 4 1 27.586207
16 4 1 27.586207
Th problem is that I need to process about 4 million records and it would take approximately 60 hours.
Does anyone know how to get the same results in a quick, efficient way, please?
Instead of reading the file on each line in a loop, it is better to do this:
ds = df.loc[:i]
Update 27.07.2024 Partially used what dydev did with range_group. Removing the call on each line. Using vector operations. As a result, 4 million lines are counted in two minutes. This will work if the trends are ordered. 1, 2, 3, etc., but not 1, 2, 3, 1 for example. Moved the previous version down.
Dataframe generation
n = 15
trend = np.random.randint(low=1, high=7, size=(n))
price = np.random.uniform(low=1, high=100, size=(n))
df = pd.DataFrame({'trend': trend, 'price': price})
df = df.sort_values(by=['trend']).reset_index(drop=True)
The calculation code itself:
time_window = 3
trends = df["trend"].unique()
arr = df['price'].values
range_group = np.stack(
[df[df["trend"] == trend].index.values.take([0, -1]) for trend in trends]
)
price = np.full((len(df), trends.size), np.nan)
prev = arr[range_group[:time_window, 1]]
for i in range(time_window, len(trends)):
stop = range_group[i, 1] + 1
price[range_group[i, 0]:stop, -1] = arr[range_group[i, 0]:stop]
price[range_group[i, 0]:stop, -(prev.size+1):-1] = prev
prev = price[range_group[i, 1], -(prev.size+1):]
price = price[range_group[time_window, 0]:]
diff = np.diff(price, axis=1)
up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]
up_chg_avg = pd.DataFrame(up_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1]
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1]
RS = np.abs(up_chg_avg / down_chg_avg)
df.loc[range_group[time_window, 0]:, 'newRSI'] = (100 - 100 / (1 + RS)).values
Previous version.
The ind
column was created to receive indexes
in map
.
An empty array arr
is generated, which is filled on each line in func_numpy
. The rest is your operations, which are converted to vector ones, which are excluded from being processed row by row and executed in one go.
I also checked that the calculated RSI values coincide with your algorithm.
df['ind'] = df.index
size_trend = df['trend'].unique().size
arr = np.full((len(df), size_trend), np.nan)
def func_numpy(x):
row_last = df.loc[:x].groupby('trend')['price'].last().values
arr[x, -row_last.size:] = row_last
df['ind'].map(func_numpy)
diff = np.diff(arr, axis=1)
up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]
up_chg_avg = pd.DataFrame(up_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
RS = np.abs(up_chg_avg / down_chg_avg)
df['newRSI'] = 100 - 100 / (1 + RS)