Search code examples
pythonmysqlpandassqlalchemygnu-parallel

Importing data to MySQL in parallel with pandas and GNU Parallel


I have several thousand directories from which I want import data to MySQL. I've created a python script that is reading data from single directory and puts it to a database. Here is a part where the data is been sent to database:

host = 'localhost'
engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
conn = engine.connect()
trans = conn.begin()
try:
    conn.execute('delete from tests where ml="%s"' % ml)
    tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
    data.to_sql(con=conn, name='data', if_exists='append', index=False)
    trans.commit()
    print(CGRE + ml + ': OK' + CEND)
except:
    trans.rollback()
    print(CRED + ml + ': database error!' + CEND)
    raise
conn.close()

One-thread execution works good but too slow:

parallel -j 1 "[[ -d {} ]] && (cd {} && data_to_db.py) || echo {} >> ~/Data/failed_db" ::: *

Now I want to launch several processes:

parallel -j 8 .........

Sometimes during execution I get this error:

sqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')

Is there a way to increase waiting time for transaction or to solve it in other way, because without parallel execution it will take too long time to import all data?


Solution

  • MANY thanks to @RomanPerekhrest, here is a working solution from MySQL manual with use of LOCK/UNLOCK TABLES.

    engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
    conn = engine.connect()
    trans = conn.begin()
    try:
        conn.execute('set autocommit=0')
        conn.execute('lock tables tests write, data write')
        conn.execute('delete from tests where ml="%s"' % ml)
        tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
        data.to_sql(con=conn, name='data', if_exists='append', index=False)
        trans.commit()
        conn.execute('unlock tables')
        print(CGRE + ml + ': OK' + CEND)
    except:
        trans.rollback()
        conn.execute('unlock tables')
        conn.close()
        print(CRED + ml + ': database error!' + CEND)
        raise
    conn.close()