Search code examples
pythonpandasmemory-leakszip

Unable to fix generator memory leak despite knowing where it's coming from


Apart from the zip files, which can be downloaded here,

https://data.binance.vision/?prefix=data/futures/um/daily/aggTrades/BTCUSDT/

here is a minimum reproducible example. First, download 3 or more zip files and get the filepaths of each zipfile. Then at the bottom of the script below, replace my filepaths in file_list with your filepaths:

from zipfile import ZipFile
import numpy as np
import pandas as pd
import psutil
import re

def print_memory_usage(msg):
    # Get memory usage information
    memory_info = psutil.virtual_memory()
    
    # Print memory usage details
    print(msg)
    print(f"Used: {memory_info.used / (1024 ** 3):.2f} GB")

def batch_generator(files: list):

    def _create_batch(file):
        return _read_file(file)

    for batch in files:
        df = _create_batch(batch)
        yield df

def _read_file(file):
    with ZipFile(file) as zipfile:
        csv_filename = re.split(r'/', file)[-1][:-4] + ".csv"
        with zipfile.open(csv_filename) as f:
            try:
                return read_aggtrades(f)
            except Exception as e:
                print(e)
                raise Exception(f"Error occurred reading file: {csv_filename}")

def read_aggtrades(file) -> pd.DataFrame:

    # EX: 578304464,17085,0.01449000,684164672,684164672,14,True,True
    columns = ['a', 'price', 'q', 'first_trade_id', 'last_trade_id', 't', 'was_the_buyer_maker']
    usecols = ['a', 'price', 'q', 't']
    dtype = {'a': np.int64, 'price': str, 'q': str, 't': np.int64}

    def peek_line(f):
        pos = f.tell()
        line = f.readline()
        f.seek(pos)

        # Convert bytes to str (line can be bytes or str)
        if type(line) == bytes:
            return line.decode()

        return line

    def _read_csv(f1):
        # 99.9% files don't have headers, some do. Discard it if we encounter it by reading header line.
        first_line = peek_line(f1)
        if first_line.startswith('agg_trade_id'):
            f1.readline()
        return pd.read_csv(f1,
                         sep=',',
                         header=None,
                         names=columns,
                         usecols=usecols,
                         dtype=dtype)
    print_memory_usage("Before _read_csv")
    df = _read_csv(file)
    print_memory_usage("After _read_csv")

    return df

file_list = [
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2019-12-31.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-01.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-02.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-03.zip"]
generator = batch_generator(file_list)

i = 0
for batch in generator:
    i += 1
    print(i)

As you will see when running it, the output shows a consistent increase in used memory:

Before _read_csv
Used: 10.31 GB
After _read_csv
Used: 10.31 GB
1
Before _read_csv
Used: 10.31 GB
After _read_csv
Used: 10.32 GB
2
Before _read_csv
Used: 10.32 GB
After _read_csv
Used: 10.33 GB
3
Before _read_csv
Used: 10.33 GB
After _read_csv
Used: 10.35 GB
4

I have no idea what to do. I tried adding gc.collect() and del(batch) after the print statement, and that doesn't help.


Solution

  • Using a different strategy for gathering the data seems to help.

    No indication of a memory leak here.

    import zipfile
    from pathlib import Path
    import psutil
    import pandas as pd
    import numpy as np
    from concurrent.futures import ThreadPoolExecutor
    
    maxmem = float("-inf")
    minmem = float("inf")
    
    def print_memory_usage(msg):
        global maxmem, minmem
        mem = psutil.virtual_memory().used / (1024 ** 3)
        maxmem = max(mem, maxmem)
        minmem = min(mem, minmem)
        print(f"{msg}: {mem:.2f} GB")
    
    
    def read_aggtrades(file) -> pd.DataFrame:
        columns = [
            "agg_trade_id",
            "price",
            "quantity",
            "first_trade_id",
            "last_trade_id",
            "transact_time",
            "is_buyer_maker",
        ]
        usecols = ["agg_trade_id", "price", "quantity", "transact_time"]
        dtype = {
            "agg_trade_id": np.int64,
            "price": np.float64,
            "quantity": np.float64,
            "transact_time": np.int64,
        }
    
        def _read_csv(f1):
            c1, *_ = f1.readline().split(",")
            try:
                int(c1)
                f1.seek(0)
            except ValueError:
                pass
            return pd.read_csv(
                f1, sep=",", header=None, names=columns, usecols=usecols, dtype=dtype
            )
    
        print_memory_usage("Before _read_csv")
        df = _read_csv(file)
        print_memory_usage("After _read_csv")
    
        return df
    
    
    def read_file(filename):
        with zipfile.ZipFile(filename) as z:
            zf, *_ = z.filelist
            z.extract(zf, DOWNLOADS)
            with open(DOWNLOADS / zf.filename) as data:
                return read_aggtrades(data)
    
    
    def batch_generator(files):
        with ThreadPoolExecutor(8) as exe:
            yield from exe.map(read_file, files)
    
    
    DOWNLOADS = Path("/Users/SIGHUP/Downloads")
    
    dfs = []
    
    for i, df in enumerate(batch_generator(DOWNLOADS.glob("BTCUSDT*zip"))):
        print(i)
        dfs.append(df)
        print(df.head())
    
    print(f"{maxmem=:.2f}GB {minmem=:.2f}GB")
    

    Platform:

    macOS 14.4
    Python 3.12.2