I have a ligh Python app which should perform a very simple task, but keeps crashing due to OOM.
.parquet
in to dataframestockstats
package.parquet
df = pd.merge(df, st, on=['datetime'])
3.10
pandas~=2.1.4
stockstats~=0.4.1
1.28.2-do.0
(running in Digital Ocean)Here is the strange thing, the dataframe is very small (df.size
is 208446
, file size is 1.00337 MB
, mem usage is 1.85537 MB
).
Measured
import os
file_stats = os.stat(filename)
file_size = file_stats.st_size / (1024 * 1024) # 1.00337 MB
df_mem_usage = dataframe.memory_usage(deep=True)
df_mem_usage_print = round(df_mem_usage.sum() / (1024 * 1024), 6 # 1.85537 MB
df_size = dataframe.size # 208446
App is deployed into Kubernetes using Helm with following resources set
resources:
limits:
cpu: 1000m
memory: 6000Mi
requests:
cpu: 1000m
memory: 1000Mi
I am using nodes with 4vCPU + 8 GB memory and the node not under performance pressure. I have created dedicated node pool with 8 vCPU + 16 GB nodes, but same issue.
kubectl top node test-pool
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
test-pool-j8t3y 38m 0% 2377Mi 17%
Pod info
kubectl describe pod xxx
...
State: Waiting
Reason: CrashLoopBackOff
Last State: Terminated
Reason: OOMKilled
Exit Code: 137
Started: Sun, 24 Mar 2024 16:08:56 +0000
Finished: Sun, 24 Mar 2024 16:09:06 +0000
...
Here is CPU and memory consumption from Grafana. I am aware that very short Memory or CPU spikes will be hard to see, but from long term perspective, the app does not consume a lot of RAM. On the other hand, from my experience we are using the same pandas
operations on containers with less RAM and dataframes are much much bigger with not problems.
How should I fix this? What else should I debug in order to prevent OOM?
Original dataframe (named df
)
datetime open high low close volume
0 2023-11-14 11:15:00 2.185 2.187 2.171 2.187 19897.847314
1 2023-11-14 11:20:00 2.186 2.191 2.183 2.184 8884.634728
2 2023-11-14 11:25:00 2.184 2.185 2.171 2.176 12106.153954
3 2023-11-14 11:30:00 2.176 2.176 2.158 2.171 22904.354082
4 2023-11-14 11:35:00 2.171 2.173 2.167 2.171 1691.211455
New dataframe (named st
).
Note: If trend_orientation = 1
=> st_lower = NaN
, if -1 => st_upper = NaN
datetime supertrend_ub supertrend_lb trend_orientation st_trend_segment
0 2023-11-14 11:15:00 0.21495 NaN -1 1
1 2023-11-14 11:20:00 0.21495 NaN -10 1
2 2023-11-14 11:25:00 0.21495 NaN -11 1
3 2023-11-14 11:30:00 0.21495 NaN -12 1
4 2023-11-14 11:35:00 0.21495 NaN -13 1
Code example
import pandas as pd
import multiprocessing
import numpy as np
import stockstats
def add_supertrend(market):
try:
# Read data from file
df = pd.read_parquet(market, engine="fastparquet")
# Extract date columns
date_column = df['datetime']
# Convert to stockstats object
st_a = stockstats.wrap(df.copy())
# Generate supertrend
st_a = st_a[['supertrend', 'supertrend_ub', 'supertrend_lb']]
# Add back datetime columns
st_a.insert(0, "datetime", date_column)
# Add trend orientation using conditional columns
conditions = [
st_a['supertrend_ub'] == st_a['supertrend'],
st_a['supertrend_lb'] == st_a['supertrend']
]
values = [-1, 1]
st_a['trend_orientation'] = np.select(conditions, values)
# Remove not required supertrend values
st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN
# Unwrap back to dataframe
st = stockstats.unwrap(st_a)
# Ensure correct date types are used
st = st.astype({
'supertrend': 'float32',
'supertrend_ub': 'float32',
'supertrend_lb': 'float32',
'trend_orientation': 'int8'
})
# Add trend segments
st_to = st[['trend_orientation']]
st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
# Remove trend value
st.drop(columns=['supertrend'], inplace=True)
# Merge ST with DF
df = pd.merge(df, st, on=['datetime'])
# Write back to parquet
df.to_parquet(market, compression=None)
except Exception as e:
# Using proper logger in real code
print(e)
pass
def main():
# Using fixed market as example, in real code market is fetched
market = "BTCUSDT"
# Using multiprocessing to free up memory after each iteration
p = multiprocessing.Process(target=add_supertrend, args=(market,))
p.start()
p.join()
if __name__ == "__main__":
main()
Dockerfile
FROM python:3.10
ENV PYTHONFAULTHANDLER=1 \
PYTHONHASHSEED=random \
PYTHONUNBUFFERED=1 \
PYTHONPATH=.
# Adding vim
RUN ["apt-get", "update"]
# Get dependencies
COPY requirements.txt .
RUN pip3 install -r requirements.txt
# Copy main app
ADD . .
CMD main.py
Possible solutions / tried approaches
Use Node-pressure Eviction in order to test whether pod even can allocate enough memory on nodes
I have done:
8vCPU + 16 GB RAM
...
image: "polinux/stress"
command: ["stress"]
args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
...
kubectl top node test-pool-j8t3y
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
test-pool-j8t3y 694m 8% 7557Mi 54%
Node description
Namespace Name CPU Requests CPU Limits Memory Requests Memory Limits Age
--------- ---- ------------ ---------- --------------- ------------- ---
kube-system cilium-24qxl 300m (3%) 0 (0%) 300Mi (2%) 0 (0%) 43m
kube-system cpc-bridge-proxy-csvvg 100m (1%) 0 (0%) 75Mi (0%) 0 (0%) 43m
kube-system csi-do-node-tzbbh 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system disable-systemd-upgrade-timer-mqjsk 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system do-node-agent-dv2z2 102m (1%) 0 (0%) 80Mi (0%) 300Mi (2%) 43m
kube-system konnectivity-agent-wq5p2 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system kube-proxy-gvfrv 0 (0%) 0 (0%) 125Mi (0%) 0 (0%) 43m
scanners data-gw-enrich-d5cff4c95-bkjkc 100m (1%) 1 (12%) 1000Mi (7%) 6000Mi (43%) 2m33s
The pod did not crash due to OOM. So it is very likely that the issue will be inside code, somewhere.
I have inserted memory measurement into multiple points. I am measuring both dataframe size and memory usage using psutil
.
import psutil
total = round(psutil.virtual_memory().total / 1000 / 1000, 4)
used = round(psutil.virtual_memory().used / 1000 / 1000, 4)
pct = round(used / total * 100, 1)
logger.info(f"[Current memory usage is: {used} / {total} MB ({pct} %)]")
Memory usage
938.1929 MB
1.947708 MB
954.1181 MB
1.147757 MB
944.9226 MB
945.4223 MB
multiprocessing
In order to "reset" memory every iteration, I am using multiprocessing
. However I wanted to be sure that this does not cause troubles. I have removed it and called the add_supertrend
directly. But it ended up in OOM, so I do not think this is the problem.
As suggested by Lukasz Tracewski, I am sharing real data which are causing the OOM crash. Since they are in parquet
format, I cannot use services like pastebin and I am using GDrive instead. I will use this folder to share any other stuff related to this question/issue.
2.2.1
Sometimes plain pacakge upgrade might help, so I have decide to try using upgrading pandas to 2.2.1
and also fastparquet
to 2024.2.0
(newer pandas required newer fastparquet). pyarrow
was also updated to 15.0.0
.
It seemed to work during first few iterations, but than crashed with OOM again.
I remembered that when I used to solve complex operations with dataframes, I used dask. So I tried to use it in this case as well. Without success. OOM again. Using dask
2024.3.1
.
import dask.dataframe as dd
# mem usage 986.452 MB
ddf1 = dd.from_pandas(df)
# mem usage 1015.37 MB
ddf2 = dd.from_pandas(st)
# mem usage 1019.50 MB
df_dask = dd.merge(ddf1, ddf2, on='datetime')
# mem usage 1021.56 MB
df = df_dask.compute() <- here it crashes ¯\_(ツ)_/¯
During investigating data with dask, I have noticed that there are duplicate records for datetime
columns. This is definitely wrong, datetime has to be unique. I think this might cause the issue. I will investigate that further.
df.tail(10)
datetime open high low close volume
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
I have implemented a fix which removes duplicate records in the other component that prepares data. Fix looks like this and I will monitor whether this will help or not.
# Append gathered data to df and write to file
df = pd.concat([df, fresh_data])
# Drop duplicates
df = df.drop_duplicates(subset=["datetime"])
In order to close this question, I have figured out that the issue was caused by duplicated datetimes in dataframe. This caused some weird bugs on dataframe merge on datetime column. So I have fixed the data and it works fine now.
During investigating data with dask, I have noticed that there are duplicate records for datetime
columns. This is definitely wrong, datetime has to be unique. I think this might cause the issue. I will investigate that further.
df.tail(10)
datetime open high low close volume
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408