Search code examples
pythonpython-3.xpandasdockerout-of-memory

Python app keeps OOM crashing on Pandas merge


I have a ligh Python app which should perform a very simple task, but keeps crashing due to OOM.

What app should do

  1. Loads data from .parquet in to dataframe
  2. Calculate indicator using stockstats package
  3. Merge freshly calculated data into original dataframe to have both OHCL + SUPERTREND inside one dataframe -> here is crashes
  4. Store dataframe as .parquet

Where is crashes

df = pd.merge(df, st, on=['datetime'])

Using

  • Python 3.10
  • pandas~=2.1.4
  • stockstats~=0.4.1
  • Kubernetes 1.28.2-do.0 (running in Digital Ocean)

Here is the strange thing, the dataframe is very small (df.size is 208446, file size is 1.00337 MB, mem usage is 1.85537 MB).

Measured

import os

file_stats = os.stat(filename)
file_size = file_stats.st_size / (1024 * 1024)  # 1.00337 MB

df_mem_usage = dataframe.memory_usage(deep=True)
df_mem_usage_print = round(df_mem_usage.sum() / (1024 * 1024), 6   # 1.85537 MB

df_size = dataframe.size  # 208446

Deployment info

App is deployed into Kubernetes using Helm with following resources set

resources:
  limits:
    cpu: 1000m
    memory: 6000Mi
  requests:
    cpu: 1000m
    memory: 1000Mi

I am using nodes with 4vCPU + 8 GB memory and the node not under performance pressure. I have created dedicated node pool with 8 vCPU + 16 GB nodes, but same issue.

kubectl top node test-pool
NAME              CPU(cores)   CPU%   MEMORY(bytes)   MEMORY%
test-pool-j8t3y   38m          0%     2377Mi          17%

Pod info

kubectl describe pod xxx
...
    State:          Waiting
      Reason:       CrashLoopBackOff
    Last State:     Terminated
      Reason:       OOMKilled
      Exit Code:    137
      Started:      Sun, 24 Mar 2024 16:08:56 +0000
      Finished:     Sun, 24 Mar 2024 16:09:06 +0000
...

Here is CPU and memory consumption from Grafana. I am aware that very short Memory or CPU spikes will be hard to see, but from long term perspective, the app does not consume a lot of RAM. On the other hand, from my experience we are using the same pandas operations on containers with less RAM and dataframes are much much bigger with not problems.

Grafana stats

How should I fix this? What else should I debug in order to prevent OOM?

Data and code example

Original dataframe (named df)

              datetime   open   high    low  close        volume
0  2023-11-14 11:15:00  2.185  2.187  2.171  2.187  19897.847314
1  2023-11-14 11:20:00  2.186  2.191  2.183  2.184   8884.634728
2  2023-11-14 11:25:00  2.184  2.185  2.171  2.176  12106.153954
3  2023-11-14 11:30:00  2.176  2.176  2.158  2.171  22904.354082
4  2023-11-14 11:35:00  2.171  2.173  2.167  2.171   1691.211455

New dataframe (named st).
Note: If trend_orientation = 1 => st_lower = NaN, if -1 => st_upper = NaN

              datetime   supertrend_ub  supertrend_lb    trend_orientation    st_trend_segment
0  2023-11-14 11:15:00   0.21495        NaN              -1                   1
1  2023-11-14 11:20:00   0.21495        NaN              -10                  1
2  2023-11-14 11:25:00   0.21495        NaN              -11                  1
3  2023-11-14 11:30:00   0.21495        NaN              -12                  1
4  2023-11-14 11:35:00   0.21495        NaN              -13                  1

Code example

import pandas as pd
import multiprocessing
import numpy as np
import stockstats


def add_supertrend(market):
    try:
        # Read data from file
        df = pd.read_parquet(market, engine="fastparquet")

        # Extract date columns
        date_column = df['datetime']

        # Convert to stockstats object
        st_a = stockstats.wrap(df.copy())
        # Generate supertrend
        st_a = st_a[['supertrend', 'supertrend_ub', 'supertrend_lb']]

        # Add back datetime columns
        st_a.insert(0, "datetime", date_column)

        # Add trend orientation using conditional columns
        conditions = [
            st_a['supertrend_ub'] == st_a['supertrend'],
            st_a['supertrend_lb'] == st_a['supertrend']
        ]
        
        values = [-1, 1]
        st_a['trend_orientation'] = np.select(conditions, values)

        # Remove not required supertrend values
        st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
        st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN

        # Unwrap back to dataframe
        st = stockstats.unwrap(st_a)

        # Ensure correct date types are used
        st = st.astype({
            'supertrend': 'float32',
            'supertrend_ub': 'float32',
            'supertrend_lb': 'float32',
            'trend_orientation': 'int8'
        })
        # Add trend segments
        st_to = st[['trend_orientation']]
        st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
        
        # Remove trend value
        st.drop(columns=['supertrend'], inplace=True)

        # Merge ST with DF
        df = pd.merge(df, st, on=['datetime'])
        
        # Write back to parquet
        df.to_parquet(market, compression=None)
    except Exception as e:
        # Using proper logger in real code
        print(e)
        pass


def main():
    # Using fixed market as example, in real code market is fetched
    market = "BTCUSDT"
    # Using multiprocessing to free up memory after each iteration
    p = multiprocessing.Process(target=add_supertrend, args=(market,))
    p.start()
    p.join()


if __name__ == "__main__":
    main()

Dockerfile

FROM python:3.10

ENV PYTHONFAULTHANDLER=1 \
    PYTHONHASHSEED=random \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=.

# Adding vim
RUN ["apt-get", "update"]

# Get dependencies
COPY requirements.txt .
RUN pip3 install -r requirements.txt

# Copy main app
ADD . .
CMD main.py

Possible solutions / tried approaches

  • ❌: tried; not worked
  • 💡: and idea I am going to test
  • 😐: did not completely solved the problem, but helped towards the solution
  • ✅: working solution

Lukasz Tracewskis suggestion

Use Node-pressure Eviction in order to test whether pod even can allocate enough memory on nodes

I have done:

  • created new node pool: 8vCPU + 16 GB RAM
  • ensured that only my pod (and some system ones) will be deployed on this node (using tolerations and affinity)
  • run a stress test with no OOM or other errors
...
          image: "polinux/stress"
          command: ["stress"]
          args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
...
kubectl top node test-pool-j8t3y
NAME              CPU(cores)   CPU%   MEMORY(bytes)   MEMORY%
test-pool-j8t3y   694m         8%     7557Mi          54%

Node description

  Namespace                   Name                                   CPU Requests  CPU Limits  Memory Requests  Memory Limits  Age
  ---------                   ----                                   ------------  ----------  ---------------  -------------  ---
  kube-system                 cilium-24qxl                           300m (3%)     0 (0%)      300Mi (2%)       0 (0%)         43m
  kube-system                 cpc-bridge-proxy-csvvg                 100m (1%)     0 (0%)      75Mi (0%)        0 (0%)         43m
  kube-system                 csi-do-node-tzbbh                      0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 disable-systemd-upgrade-timer-mqjsk    0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 do-node-agent-dv2z2                    102m (1%)     0 (0%)      80Mi (0%)        300Mi (2%)     43m
  kube-system                 konnectivity-agent-wq5p2               0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 kube-proxy-gvfrv                       0 (0%)        0 (0%)      125Mi (0%)       0 (0%)         43m
  scanners                    data-gw-enrich-d5cff4c95-bkjkc         100m (1%)     1 (12%)     1000Mi (7%)      6000Mi (43%)   2m33s

The pod did not crash due to OOM. So it is very likely that the issue will be inside code, somewhere.

Detailed memory monitoring

I have inserted memory measurement into multiple points. I am measuring both dataframe size and memory usage using psutil.

import psutil

total = round(psutil.virtual_memory().total / 1000 / 1000, 4)
used = round(psutil.virtual_memory().used / 1000 / 1000, 4)
pct = round(used / total * 100, 1)
logger.info(f"[Current memory usage is: {used} / {total} MB ({pct} %)]")

Memory usage

  • prior read data from file
    • RAM: 938.1929 MB
  • after df loaded
    • df_mem_usage: 1.947708 MB
    • RAM: 954.1181 MB
  • after ST generated
    • df_mem_usage of ST df: 1.147757 MB
    • RAM: 944.9226 MB
  • line before df merge
    • df_mem_usage: 945.4223 MB

❌ Not using multiprocessing

In order to "reset" memory every iteration, I am using multiprocessing. However I wanted to be sure that this does not cause troubles. I have removed it and called the add_supertrend directly. But it ended up in OOM, so I do not think this is the problem.

Real data

As suggested by Lukasz Tracewski, I am sharing real data which are causing the OOM crash. Since they are in parquet format, I cannot use services like pastebin and I am using GDrive instead. I will use this folder to share any other stuff related to this question/issue.

❌ Upgrade pandas to 2.2.1

Sometimes plain pacakge upgrade might help, so I have decide to try using upgrading pandas to 2.2.1 and also fastparquet to 2024.2.0 (newer pandas required newer fastparquet). pyarrow was also updated to 15.0.0.

It seemed to work during first few iterations, but than crashed with OOM again.

❌ Using Dask

I remembered that when I used to solve complex operations with dataframes, I used dask. So I tried to use it in this case as well. Without success. OOM again. Using dask 2024.3.1.

import dask.dataframe as dd
# mem usage 986.452 MB
ddf1 = dd.from_pandas(df)
# mem usage 1015.37 MB
ddf2 = dd.from_pandas(st)
# mem usage 1019.50 MB
df_dask = dd.merge(ddf1, ddf2, on='datetime')
# mem usage 1021.56 MB
df = df_dask.compute() <- here it crashes ¯\_(ツ)_/¯

💡 Duplicated datetimes

During investigating data with dask, I have noticed that there are duplicate records for datetime columns. This is definitely wrong, datetime has to be unique. I think this might cause the issue. I will investigate that further.

df.tail(10)
             datetime   open   high     low   close         volume
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408

I have implemented a fix which removes duplicate records in the other component that prepares data. Fix looks like this and I will monitor whether this will help or not.

    # Append gathered data to df and write to file
    df = pd.concat([df, fresh_data])

    # Drop duplicates
    df = df.drop_duplicates(subset=["datetime"])

Solution

  • In order to close this question, I have figured out that the issue was caused by duplicated datetimes in dataframe. This caused some weird bugs on dataframe merge on datetime column. So I have fixed the data and it works fine now.


    Duplicated datetimes

    During investigating data with dask, I have noticed that there are duplicate records for datetime columns. This is definitely wrong, datetime has to be unique. I think this might cause the issue. I will investigate that further.

    df.tail(10)
                 datetime   open   high     low   close         volume
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
    0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408