Search code examples
pythonpython-3.xsqlalchemypython-multithreading

Sqlalchemy on multiple write threads


The following code works in Python 3.6+, not in Python 3.4.3, not sure at what release it fails. Why is this? I was under the impression that sqlalchemy would handle multiple readers/writers to a file-based db, by hiding it behind a serialiser of the calls, probably. Anyway, this is an indication that I'm not handling this right - how to insert on multiple threads, or one thread off the main thread, in versions < 3.6?

I attempted this at the sqlalchemy session() level sqlalchemy connection pool on multiple threads

but was only able to get it to work with an engine, and now I find out only on 3.6.

def insert_inventory_table(conn, table, inventory):
    conn.execute(table.insert(), inventory)    

def results_table(conn, table):
    q = select([table])
    data = conn.execute(q).fetchall()
    print('{!r}'.format(data))


def main_0():
    engine = create_engine('sqlite://', connect_args={'check_same_thread' : False})
    conn = engine.connect()

    metadata = MetaData(engine)
    table = Table('inventory',
              metadata,
              Column('item_no', Integer, primary_key=True, autoincrement=True),
              Column('desc', String(255), nullable=False),
              Column('volume', Integer, nullable=False)
              )

    metadata.create_all()

    some_inventory = [{'item_no' : 0, 'desc' : 'drone', 'volume' : 12},
                      {'item_no' : 1, 'desc' : 'puddle jumper', 'volume' : 2},
                      {'item_no' : 2, 'desc' : 'pet monkey', 'volume' : 1},
                      {'item_no' : 3, 'desc' : 'bowling ball', 'volume' : 4},
                      {'item_no' : 4, 'desc' : 'electric guitar', 'volume' : 3},
                      {'item_no' : 5, 'desc' : 'bookends', 'volume' : 2}]


    thread_0 = threading.Thread(target=insert_inventory_table, args=(conn, table, some_inventory[0:3]))
    thread_1 = threading.Thread(target=insert_inventory_table, args=(conn, table, some_inventory[3:]))

    thread_0.start()
    thread_1.start()

    return conn, table


if __name__ == '__main__':

    conn,table = main_0()
    results_table(conn, table)

Thanks.


Solution

  • As pointed out, you have to use scoped session, so instead of your conn:

    from sqlalchemy.orm import sessionmaker, scoped_session
    
    db_session = scoped_session(sessionmaker(autocommit=False,
                                             autoflush=False,
                                             bind=engine))
    

    And then whenever you connect to your db in a thread, remembeer to open and close a session, you dont want sessions to be shared by threads, because this will lead to bad things, so:

    def worker(sth):
         session = db_session()
         "do sth very important"
         res = session.query(YourModel).filter(YourModel.sth = sth).first()
         print(res)
         session.close()
    

    And then you can use such workers in your multithreaded operations. I hope it helped.