I am trying to enter many entries in a Postgres database. To speed things up, since all individual actions are simple, I wanted to multithread the insert job. The thing that I can't wrap my head around is that each run through the list of 10,000 entries I want to add usually adds around 5, 10, or 20 entries maybe, and finishes without failing or producing any error.
The code looks like this: (I've left out column and Table specifications, if they seem helpful, I can add a more detailed code example).
engine = create_engine(DATABASE_URI)
Session = scoped_session(sessionmaker(bind=engine))
def write_entry(information):
session = Session()
one_new_entry = ATypeOfEntry(
a_field = information
)
Session.add(one_politician)
Session.commit()
Session.remove()
if __name__ == "__main__":
list_of_information = get_list_of_information()
with tqdm(total=len(list_of_members_and_wikidata)) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers= 10) as executor:
futures = [executor.submit(write_entry, information) for entry in list_of_information]
for future concurrent. futures.as_completed(futures):
pbar.update()
The SQL output then looks like this.
INSERT INTO entry_table ("information") VALUES (%(information)s) RETURNING entry.id
This is want I am looking for, except that after the run, it should contain as many INSERT statements as I have entries added to the list, not for example 250 like my last run. tqdm happily finishes, meaning that like this, I'd have to run the script a great number of times to catch every last entry.
I have verified that the function individually works when I call it, so it seems it has to do something with the multithreading setup. I tried changing settings for connection pooling of SQL-Alchemy, the number of workers for the ThreadPoolExecutor, but if they affect it, they seem to at least not solve the issue.
I thought it might have to do something with my general connection settings, so I've cloned the whole DB to my hard drive, but I am seeing the same behaviour there.
Confused about what is happening; advice on triaging further would already be appreciated since I don't know where to look.
I think you have an exception in the write_entry
if you don't call result()
it won't be raised. You can also call future.exception()
to check if one occurred. So everything "completed" but nothing was written because the calls all failed with exceptions.
In your example you have one_new_entry
but then add one_politician
. So I think maybe it is something along those lines (ie. an exception is raised but not checked for).
concurrent.futures.Future.exception
Here is an example that writes an integer field.
import sys
from sqlalchemy import (
create_engine,
Integer,
)
from sqlalchemy.schema import (
Column,
)
from sqlalchemy.sql import select, func
from sqlalchemy.orm import (
declarative_base,
Session as NormalSession,
scoped_session,
sessionmaker,
)
import concurrent
Base = declarative_base()
username, password, db = sys.argv[1:4]
class Entry(Base):
__tablename__ = "entries"
id = Column(Integer, primary_key=True)
value = Column(Integer)
engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=False)
Base.metadata.create_all(engine)
Session = scoped_session(sessionmaker(bind=engine))
def write_entry(info):
session = Session()
entry = Entry(value=info)
if info % 2 == 0:
raise AssertionError("Info must be an odd number.")
session.add(entry)
session.commit()
entry_id = entry.id
Session.remove()
return entry_id
def get_list_of_information():
return [i for i in range(100)]
if __name__ == "__main__":
list_of_information = get_list_of_information()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_info = {
executor.submit(write_entry, info): info for info in list_of_information
}
for future in concurrent.futures.as_completed(future_to_info):
info = future_to_info[future]
try:
entry_id = future.result()
except Exception as e:
print(e)
print(f"Failed to insert {info}")
else:
print(f"Finished with {info} written to {entry_id}")
with NormalSession(engine) as session:
print(session.scalar(select(func.count(Entry.id))))
Some of the output
Failed to insert 94
Finished with 73 written to 37
Info must be an odd number.
Failed to insert 96
Finished with 79 written to 40
Info must be an odd number.
Failed to insert 98
Finished with 81 written to 41
Finished with 85 written to 43
Finished with 99 written to 49
Finished with 97 written to 48
Finished with 93 written to 47
Finished with 95 written to 50
Finished with 91 written to 46
Finished with 87 written to 44
Finished with 89 written to 45
Finished with 83 written to 42
50