Search code examples
pythonsqlitesqlalchemydaskdask-dataframe

populate SQL database with dask dataframe and dump into a file


reproduce the error and the use case on this colab

I have multiple large tables that I read and analyze through Dask (dataframe). After doing analysis, I would like to push them into a local database (in this case sqlite engine through sqlalchemy package.

here is a dummy data:

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame([{"i": i, "s": str(i) * 2} for i in range(4)])

ddf = dd.from_pandas(df, npartitions=2)

from dask.utils import tmpfile
from sqlalchemy import create_engine

with tmpfile(
    dir="/outputs/",
    extension="db",
) as f:
    print(f)

    db = f"sqlite:///{f}"

    ddf.to_sql("test_table", db)

    engine = create_engine(
        db,
        echo=False,
    )

    print(dir(engine))
    result = engine.execute("SELECT * FROM test_table").fetchall()

print(result)

however, the tmpfile is temporary and is not stored on my local drive. I would like to dump the database into my local drive; I could not find any argument for tmpfile to ensure it is stored as a file. Neither could figure out how to dump my engine.

Update if I use a regular file, I will encounter the following error

    return self.dbapi.connect(*cargs, **cparams)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file
(Background on this error at: https://sqlalche.me/e/14/e3q8)

here is the code

with open(
    "/outputs/hello.db", "wb"
) as f:
    print(f)

    db = f"sqlite:///{f}"

    ddf.to_sql("test_table", db, if_exists="replace")

    engine = create_engine(
        db,
        echo=False,
    )

    print(dir(engine))
    result = engine.execute("SELECT * FROM test_table").fetchall()

print(result)

Solution

  • If you would like to save to a regular file, there is no need to use the context manager:

    import dask.dataframe as dd
    import pandas as pd
    
    df = pd.DataFrame([{"i": i, "s": str(i) * 2} for i in range(4)])
    ddf = dd.from_pandas(df, npartitions=2)
    
    
    OUT_FILE = "test.db"
    db = f"sqlite:///{OUT_FILE}"
    
    ddf.to_sql("test_table", db)
    

    To test that the file is saved, run:

    from sqlalchemy import create_engine
    
    engine = create_engine(
        db,
        echo=False,
    )
    
    result = engine.execute("SELECT * FROM test_table").fetchall()
    
    print(result)
    # [(0, 0, '00'), (1, 1, '11'), (2, 2, '22'), (3, 3, '33')]