I want to create an np.ndarray
as an input for a machine learning model:
array_X = np.array([list(w.values) for w in df[['close', 'volume']].rolling(window=20)][19:-1])
It is the standard way in time series, where we use a window of past values as an input to predict the future value. The shape of array is 2 * 20 * 20000000
. It will cost a lot of time to build such array and sometime there will be an error that the memory consumed by the array is too large.
Is there any way to improve the above (time cost and memory error)?
Your original code gives me an error because of a dimension mismatch for the first few entries in the array, which are too short because the window isn't full yet, so I changed it to discard the first values:
def rolling_approach(df, window_size=3):
return np.array(
[w.values for w in df[["close", "volume"]].rolling(window=window_size)][
window_size - 1 :
]
)
pd.DataFrame.rolling
can be extremely slow for these kinds of operations. shift
is very efficient in pandas
Here's an example for window_size=3
written out:
pd.concat(
[
df[["close", "volume"]].shift().shift(),
df[["close", "volume"]].shift(),
df[["close", "volume"]],
],
axis=1,
)
.values[2:, :]
.reshape(-1, 3, 2)
)
We stack the shifts and then reshape the values.
Generalized to a variable window_size
we get:
def shift_approach(df, window_size=3):
shifted_df = pd.concat(
[df[["close", "volume"]].shift(i) for i in range(window_size - 1, -1, -1)],
axis=1,
)
reshaped_array = shifted_df.values[window_size - 1 :, :].reshape(-1, window_size, 2)
return reshaped_array
shift
outperforms the rolling
operation by almost two orders of magnitude:
And it scales somewhat well into hundreds of millions of rows on my MacBook:
def setup(N):
np.random.seed(42)
close_values = np.random.randint(1, 100, size=N)
volume_values = np.random.randint(100, 1000, size=N)
df = pd.DataFrame({"close": close_values, "volume": volume_values})
return [df, 10]
approaches = [rolling_approach, shift_approach]
# Show correctness
for approach in approaches[1:]:
data = setup(10)
assert np.isclose(approach(*data), approaches[0](*data)).all()
run_performance_comparison(
approaches,
[
1000,
3000,
5000,
10000,
30000,
100_000,
300_000,
500_000,
1_000_000,
3_000_000,
5_000_000,
10_000_000,
],
setup=setup,
title="Performance Comparison",
number_of_repetitions=2,
)
Profiling code:
import timeit
from functools import partial
import matplotlib.pyplot as plt
from typing import List, Dict, Callable
from contextlib import contextmanager
@contextmanager
def data_provider(data_size, setup=lambda N: N, teardown=lambda: None):
data = setup(data_size)
yield data
teardown()
def run_performance_comparison(approaches: List[Callable],
data_size: List[int],
setup=lambda N: N,
teardown=lambda: None,
number_of_repetitions=5, title='Performance Comparison', data_name='N'):
approach_times: Dict[Callable, List[float]] = {approach: [] for approach in approaches}
for N in data_size:
with data_provider(N, setup, teardown) as data:
for approach in approaches:
function = partial(approach, *data)
approach_time = min(timeit.Timer(function).repeat(repeat=number_of_repetitions, number=2))
approach_times[approach].append(approach_time)
for approach in approaches:
plt.plot(data_size, approach_times[approach], label=approach.__name__)
plt.yscale('log')
plt.xscale('log')
plt.xlabel(data_name)
plt.ylabel('Execution Time (seconds)')
plt.title(title)
plt.legend()
plt.show()