I am hitting the memory when I try to dump my dask dataframe into a local SQLlite database.
dd.to_sql(
"merchange",
db,
if_exists="replace",
index_label="user_id",
)
so I thought I can use map_partitions
.
def write_partition_to_sqlite(partition: dd.DataFrame) -> None:
"""Insert the data in the table."""
partition.to_sql("merchange", db, if_exists="append")
# write each partition to SQL lite
dd_merchange.map_partitions(write_partition_to_sqlite).compute()
but still I hit the memory. I have 64gig RAM; have already lowered the precision of the columns;
In Polars I can do
splits = np.array_split(range(0, n_rows), n_chunks)
# For each partition in the DataFrame, convert to pandas and insert
for part in splits:
with engine.begin() as conn:
conn.execute(
pl_df.insert(),
pl_df.slice(part[0], len(part))
.to_pandas()
.to_dict(orient="records"),
)
but if I do the same in Dask, that means, I have to execute .compute()
on every chunk which takes considerable amount of time too.
I have had a similar issue and from the dask documenttion I found out that using delayed_objects
would solve it....and it did. Here is an example of how to apply this and you can certainly adapt it to you situation:
import dask.dataframe as dd
import numpy as np
import pandas as pd
import sqlite3
from dask.delayed import delayed
from dask import compute
import threading
df = pd.DataFrame({
'user_id': np.arange(1000000),
'transaction_amount': np.random.normal(100, 10, size=1000000)
})
dd_merchange = dd.from_pandas(df, npartitions=4)
conn_local = threading.local()
def get_conn():
global conn_local
if not hasattr(conn_local, "conn"):
conn_local.conn = sqlite3.connect('example2.db')
return conn_local.conn
def insert_into_sqlite(dataframe, table_name):
conn = get_conn()
cursor = conn.cursor()
placeholders = ','.join('?' * len(dataframe.columns))
insert_statement = f"INSERT INTO {table_name} VALUES ({placeholders})"
data_to_insert = [tuple(row) for row in dataframe.values]
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(dataframe.columns)});")
cursor.executemany(insert_statement, data_to_insert)
conn.commit()
@delayed
def process_partition(partition, table_name):
insert_into_sqlite(partition, table_name)
delayed_objects = []
for partition in dd_merchange.to_delayed():
delayed_objects.append(process_partition(partition, "merchange"))
compute(*delayed_objects)