At each time step t, I get a new x_t and y_t. Once there are a total of PERIOD x's and y's, it'll do ordinary least squares and spit out the coefficient. In the code, "benchmark" is the x and "asset" is the y. (This is from finance lingo). The online_calc method does the actual computation.
class Beta():
def __init__(self, PERIOD):
self.PERIOD = PERIOD
self.stat = None
self.benchmark_vec = []
self.asset_vec = []
def update(self, new_benchmark, new_asset):
"""
assumes that on update, neither new_asset nor new_benchmark are none
"""
if new_benchmark is not None and new_asset is not None:
self.benchmark_vec.append(new_benchmark)
self.asset_vec.append(new_asset)
if len(self.benchmark_vec)>self.PERIOD:
self.benchmark_vec=self.benchmark_vec[1:]
if len(self.asset_vec)>self.PERIOD:
self.asset_vec=self.asset_vec[1:]
if len(self.benchmark_vec)==self.PERIOD and len(self.asset_vec)==self.PERIOD:
self.stat=self.online_calc()
return self.stat
def online_calc(self):
asset_ar = np.array(self.asset_vec)
benchmark_ar = np.array(self.benchmark_vec)
y = asset_ar[1:]/asset_ar[:-1]-1
x = benchmark_ar[1:]/benchmark_ar[:-1]-1
X = x.reshape(-1, 1)
X = np.concatenate([np.ones_like(X), X], axis=1)
b = np.linalg.pinv(X.T.dot(X)).dot(X.T).dot(y)
return b[1]
###### example usage
asset_vec=[ 0.01568653, -0.00669479, 0.01140213, -0.00107317, -0.00131155, -0.00333463,
-0.00114006, 0.00263075, -0.00507337, 0.00712401, 0.00388323]
benchmark_vec=[ 0.01150227, 0.00045742, 0.01114376, -0.00305367, 0.00388245, 0.00323491,
-0.00449446, -0.00075698, -0.01114904, 0.0147878, 0.00528754]
beta = Beta(4)
output = []
for b, a in zip(benchmark_vec, asset_vec):
output.append(beta.update(b, a))
output
I was previously using scipy.stats.linregress to get the coefficient but it was about 50% slower than this (i'm guessing because it also calculates the intercept, p-value, etc). So I am wondering if there are any speed gains i'm leaving on the table.
EDIT: edited code to include example usage
Here is an attempt with update. This uses previously calculated values and thus should be faster. If memory is an issue, we could optimize it further by calculating on the fly rather than storing the values and calling them:
class Beta2():
def __init__(self, PERIOD):
self.PERIOD = PERIOD
self.stat = None
self.benchmark_vec = []
self.asset_vec = []
self._x, self._y, self._xx, self._xy = [], [], [], []
self._sum_x, self._sum_y, self._sum_xx, self._sum_xy = 0,0,0,0
self._ln = 0
def update(self, new_benchmark, new_asset):
"""
assumes that on update, neither new_asset nor new_benchmark are none
"""
if new_benchmark is not None and new_asset is not None:
try:
x = new_benchmark/ self.benchmark_vec[-1] - 1
y = new_asset / self.asset_vec[-1] - 1
except:
x, y = 0, 0
self._x.append(x)
self._y.append(y)
self._xx.append(x * x)
self._xy.append(x * y)
self._sum_x += x
self._sum_xx += self._xx[-1]
self._sum_y += y
self._sum_xy += self._xy[-1]
self.benchmark_vec.append(new_benchmark)
self.asset_vec.append(new_asset)
self._ln += 1
if (i := self._ln - self.PERIOD + 1) > 0:
self.online_calc()
self._sum_x -= self._x[i]
self._sum_xx -= self._xx[i]
self._sum_y -= self._y[i]
self._sum_xy -= self._xy[i]
return self.stat
def online_calc(self):
numerator = self._sum_xy - self._sum_x * self._sum_y/(self.PERIOD - 1)
denominator = self._sum_xx - self._sum_x * self._sum_x/(self.PERIOD - 1)
self.stat = numerator/denominator
###### exa
beta2 = Beta2(4)
output2 = []
for b, a in zip(benchmark_vec, asset_vec):
output2.append(beta2.update(b, a))