Search code examples
pythonpostgresqlsqlalchemy

Multithreading in ThreadPoolExecutor: couldn't see raised exceptions


I am trying to enter many entries in a Postgres database. To speed things up, since all individual actions are simple, I wanted to multithread the insert job. The thing that I can't wrap my head around is that each run through the list of 10,000 entries I want to add usually adds around 5, 10, or 20 entries maybe, and finishes without failing or producing any error.

The code looks like this: (I've left out column and Table specifications, if they seem helpful, I can add a more detailed code example).

engine = create_engine(DATABASE_URI)
Session = scoped_session(sessionmaker(bind=engine))

def write_entry(information):
    session = Session()
    one_new_entry = ATypeOfEntry(
        a_field = information
    )
    Session.add(one_politician)
    Session.commit() 
    Session.remove()

if __name__ == "__main__":
    list_of_information = get_list_of_information()
    
    with tqdm(total=len(list_of_members_and_wikidata)) as pbar:
        
        with concurrent.futures.ThreadPoolExecutor(max_workers= 10) as executor:
            futures = [executor.submit(write_entry, information) for entry in list_of_information]
            
            for future concurrent. futures.as_completed(futures):
                pbar.update()

The SQL output then looks like this.

INSERT INTO entry_table ("information") VALUES (%(information)s) RETURNING entry.id

This is want I am looking for, except that after the run, it should contain as many INSERT statements as I have entries added to the list, not for example 250 like my last run. tqdm happily finishes, meaning that like this, I'd have to run the script a great number of times to catch every last entry.

I have verified that the function individually works when I call it, so it seems it has to do something with the multithreading setup. I tried changing settings for connection pooling of SQL-Alchemy, the number of workers for the ThreadPoolExecutor, but if they affect it, they seem to at least not solve the issue.

I thought it might have to do something with my general connection settings, so I've cloned the whole DB to my hard drive, but I am seeing the same behaviour there.

Confused about what is happening; advice on triaging further would already be appreciated since I don't know where to look.


Solution

  • I think you have an exception in the write_entry if you don't call result() it won't be raised. You can also call future.exception() to check if one occurred. So everything "completed" but nothing was written because the calls all failed with exceptions.

    In your example you have one_new_entry but then add one_politician. So I think maybe it is something along those lines (ie. an exception is raised but not checked for).

    concurrent.futures.Future.exception

    Here is an example that writes an integer field.

    import sys
    from sqlalchemy import (
        create_engine,
        Integer,
    )
    from sqlalchemy.schema import (
        Column,
    )
    from sqlalchemy.sql import select, func
    from sqlalchemy.orm import (
        declarative_base,
        Session as NormalSession,
        scoped_session,
        sessionmaker,
    )
    import concurrent
    
    Base = declarative_base()
    
    
    username, password, db = sys.argv[1:4]
    
    
    class Entry(Base):
        __tablename__ = "entries"
        id = Column(Integer, primary_key=True)
        value = Column(Integer)
    
    
    engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=False)
    
    
    Base.metadata.create_all(engine)
    
    
    Session = scoped_session(sessionmaker(bind=engine))
    
    
    def write_entry(info):
        session = Session()
        entry = Entry(value=info)
        if info % 2 == 0:
            raise AssertionError("Info must be an odd number.")
        session.add(entry)
        session.commit()
        entry_id = entry.id
        Session.remove()
        return entry_id
    
    
    def get_list_of_information():
        return [i for i in range(100)]
    
    
    if __name__ == "__main__":
        list_of_information = get_list_of_information()
    
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            future_to_info = {
                executor.submit(write_entry, info): info for info in list_of_information
            }
            for future in concurrent.futures.as_completed(future_to_info):
                info = future_to_info[future]
                try:
                    entry_id = future.result()
                except Exception as e:
                    print(e)
                    print(f"Failed to insert {info}")
                else:
                    print(f"Finished with {info} written to {entry_id}")
    
        with NormalSession(engine) as session:
            print(session.scalar(select(func.count(Entry.id))))
    

    Some of the output

    Failed to insert 94
    Finished with 73 written to 37
    Info must be an odd number.
    Failed to insert 96
    Finished with 79 written to 40
    Info must be an odd number.
    Failed to insert 98
    Finished with 81 written to 41
    Finished with 85 written to 43
    Finished with 99 written to 49
    Finished with 97 written to 48
    Finished with 93 written to 47
    Finished with 95 written to 50
    Finished with 91 written to 46
    Finished with 87 written to 44
    Finished with 89 written to 45
    Finished with 83 written to 42
    50