Search code examples
daskdask-distributed

How to dump a large dask dataframe into sql


I am hitting the memory when I try to dump my dask dataframe into a local SQLlite database.

dd.to_sql(
    "merchange",
    db,
    if_exists="replace",
    index_label="user_id",
)

so I thought I can use map_partitions.

def write_partition_to_sqlite(partition: dd.DataFrame) -> None:
    """Insert the data in the table."""
    partition.to_sql("merchange", db, if_exists="append")

# write each partition to SQL lite
dd_merchange.map_partitions(write_partition_to_sqlite).compute()

but still I hit the memory. I have 64gig RAM; have already lowered the precision of the columns;

In Polars I can do

splits = np.array_split(range(0, n_rows), n_chunks)

# For each partition in the DataFrame, convert to pandas and insert
for part in splits:
    with engine.begin() as conn:
        conn.execute(
            pl_df.insert(),
            pl_df.slice(part[0], len(part))
            .to_pandas()
            .to_dict(orient="records"),
        )

but if I do the same in Dask, that means, I have to execute .compute() on every chunk which takes considerable amount of time too.


Solution

  • I have had a similar issue and from the dask documenttion I found out that using delayed_objectswould solve it....and it did. Here is an example of how to apply this and you can certainly adapt it to you situation:

    import dask.dataframe as dd
    import numpy as np
    import pandas as pd
    import sqlite3
    from dask.delayed import delayed
    from dask import compute
    import threading
    
    df = pd.DataFrame({
        'user_id': np.arange(1000000),
        'transaction_amount': np.random.normal(100, 10, size=1000000)
    })
    dd_merchange = dd.from_pandas(df, npartitions=4)
    
    conn_local = threading.local()
    
    def get_conn():
        global conn_local
        if not hasattr(conn_local, "conn"):
            conn_local.conn = sqlite3.connect('example2.db')
        return conn_local.conn
    
    def insert_into_sqlite(dataframe, table_name):
        conn = get_conn()
        cursor = conn.cursor()
        placeholders = ','.join('?' * len(dataframe.columns))
        insert_statement = f"INSERT INTO {table_name} VALUES ({placeholders})"
        data_to_insert = [tuple(row) for row in dataframe.values]
        
        cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(dataframe.columns)});")
        
        cursor.executemany(insert_statement, data_to_insert)
        conn.commit()
    
    @delayed
    def process_partition(partition, table_name):
        insert_into_sqlite(partition, table_name)
    
    delayed_objects = []
    for partition in dd_merchange.to_delayed():
        delayed_objects.append(process_partition(partition, "merchange"))
    
    compute(*delayed_objects)