Apart from the zip files, which can be downloaded here,
https://data.binance.vision/?prefix=data/futures/um/daily/aggTrades/BTCUSDT/
here is a minimum reproducible example. First, download 3 or more zip files and get the filepaths of each zipfile. Then at the bottom of the script below, replace my filepaths in file_list
with your filepaths:
from zipfile import ZipFile
import numpy as np
import pandas as pd
import psutil
import re
def print_memory_usage(msg):
# Get memory usage information
memory_info = psutil.virtual_memory()
# Print memory usage details
print(msg)
print(f"Used: {memory_info.used / (1024 ** 3):.2f} GB")
def batch_generator(files: list):
def _create_batch(file):
return _read_file(file)
for batch in files:
df = _create_batch(batch)
yield df
def _read_file(file):
with ZipFile(file) as zipfile:
csv_filename = re.split(r'/', file)[-1][:-4] + ".csv"
with zipfile.open(csv_filename) as f:
try:
return read_aggtrades(f)
except Exception as e:
print(e)
raise Exception(f"Error occurred reading file: {csv_filename}")
def read_aggtrades(file) -> pd.DataFrame:
# EX: 578304464,17085,0.01449000,684164672,684164672,14,True,True
columns = ['a', 'price', 'q', 'first_trade_id', 'last_trade_id', 't', 'was_the_buyer_maker']
usecols = ['a', 'price', 'q', 't']
dtype = {'a': np.int64, 'price': str, 'q': str, 't': np.int64}
def peek_line(f):
pos = f.tell()
line = f.readline()
f.seek(pos)
# Convert bytes to str (line can be bytes or str)
if type(line) == bytes:
return line.decode()
return line
def _read_csv(f1):
# 99.9% files don't have headers, some do. Discard it if we encounter it by reading header line.
first_line = peek_line(f1)
if first_line.startswith('agg_trade_id'):
f1.readline()
return pd.read_csv(f1,
sep=',',
header=None,
names=columns,
usecols=usecols,
dtype=dtype)
print_memory_usage("Before _read_csv")
df = _read_csv(file)
print_memory_usage("After _read_csv")
return df
file_list = [
"/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2019-12-31.zip", \
"/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-01.zip", \
"/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-02.zip", \
"/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-03.zip"]
generator = batch_generator(file_list)
i = 0
for batch in generator:
i += 1
print(i)
As you will see when running it, the output shows a consistent increase in used memory:
Before _read_csv
Used: 10.31 GB
After _read_csv
Used: 10.31 GB
1
Before _read_csv
Used: 10.31 GB
After _read_csv
Used: 10.32 GB
2
Before _read_csv
Used: 10.32 GB
After _read_csv
Used: 10.33 GB
3
Before _read_csv
Used: 10.33 GB
After _read_csv
Used: 10.35 GB
4
I have no idea what to do. I tried adding gc.collect()
and del(batch)
after the print statement, and that doesn't help.
Using a different strategy for gathering the data seems to help.
No indication of a memory leak here.
import zipfile
from pathlib import Path
import psutil
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor
maxmem = float("-inf")
minmem = float("inf")
def print_memory_usage(msg):
global maxmem, minmem
mem = psutil.virtual_memory().used / (1024 ** 3)
maxmem = max(mem, maxmem)
minmem = min(mem, minmem)
print(f"{msg}: {mem:.2f} GB")
def read_aggtrades(file) -> pd.DataFrame:
columns = [
"agg_trade_id",
"price",
"quantity",
"first_trade_id",
"last_trade_id",
"transact_time",
"is_buyer_maker",
]
usecols = ["agg_trade_id", "price", "quantity", "transact_time"]
dtype = {
"agg_trade_id": np.int64,
"price": np.float64,
"quantity": np.float64,
"transact_time": np.int64,
}
def _read_csv(f1):
c1, *_ = f1.readline().split(",")
try:
int(c1)
f1.seek(0)
except ValueError:
pass
return pd.read_csv(
f1, sep=",", header=None, names=columns, usecols=usecols, dtype=dtype
)
print_memory_usage("Before _read_csv")
df = _read_csv(file)
print_memory_usage("After _read_csv")
return df
def read_file(filename):
with zipfile.ZipFile(filename) as z:
zf, *_ = z.filelist
z.extract(zf, DOWNLOADS)
with open(DOWNLOADS / zf.filename) as data:
return read_aggtrades(data)
def batch_generator(files):
with ThreadPoolExecutor(8) as exe:
yield from exe.map(read_file, files)
DOWNLOADS = Path("/Users/SIGHUP/Downloads")
dfs = []
for i, df in enumerate(batch_generator(DOWNLOADS.glob("BTCUSDT*zip"))):
print(i)
dfs.append(df)
print(df.head())
print(f"{maxmem=:.2f}GB {minmem=:.2f}GB")
Platform:
macOS 14.4
Python 3.12.2