I've been trying various methods to bulk upSert an Azure SQL (MSSQL) database using SQLAlchemy 2.0, the source table is fairly large 2M records and I need to bulk upSert 100,000 records (most of which won't be there).
NOTE This will run as an Azure function so if there is a better way I'm open to this
class issues(Base):
__tablename__ = "issues"
id = mapped_column('id', String(36), primary_key=True)
created = mapped_column ('created', DateTime())
updated = mapped_column ('updated', DateTime())
status = mapped_column('status', String(50))
severity = mapped_column('severity', String(10))
control_id = mapped_column('control_id', String(36))
entity_id = mapped_column('entity_id', String(36))
Example data
issueList = {
issues( "1234", datetime.now(), datetime.now() , "Test", "Low8", "con123", "ent123"),
issues( "5678", datetime.now(), datetime.now() , "Test", "Low9", "con123", "ent123"),
}
Currently I'm doing session.merge(issue)
but it's slow and doesn't support bulk inserts, I've looked at https://stackoverflow.com/a/69968892/1697288 but have been getting errors as I was passing:
issueList = {
"1234": { id: "1234", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low16", "control_id": "con123", "entity_id": "ent123" },
"5678": { id: "5678", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low9", "control_id": "con123", "entity_id": "ent123" },
}
upsert_data (session, issueList, "issues", "id")
It seems to be expecting a model not text for the 3rd params, so I wasn't sure what to send.
Any suggestions of a fast model would be great. Only this application will be inserting data so locking the db isn't an issue as long as the lock is cleared on error.
Thanks.
I ended up having writing my own function in the end:
import json
import logging
log = logging.getLogger(__name__)
Make sure the model is defined as this will need to be passed, entities are a list of dictionaries (make sure the dictionary keys match your database field names).
Function, with logging an optional json dump (remove as needed)
def upsert_data(session, entries, model, key):
batch_size = 1000
if batch_size > len(entries):
batch_size = len(entries)
if jsonDump:
with open("json/" + model.__tablename__ + "_entries_preprocess.json", "w") as f:
json.dump(entries, default=str, fp=f)
modelKey = getattr(model, key)
for i in range(0, len(entries), batch_size):
log.info("Working Batch " + str(i) + "-" + str(i + batch_size))
# Get the next batch
batch = entries[i:i + batch_size]
entries_to_update = []
entries_to_insert = batch
# Extract keys from batch
keysinbatch = [entry.get(key) for entry in batch]
existing_records = session.query(modelKey).filter(modelKey.in_(keysinbatch)).all()
# Iterate results
for entry in existing_records:
# Process this batch
dbIndex = getattr(entry, key)
index = 0
for x in entries_to_insert:
if dbIndex == x[key]:
# Matches item in DB, move this item to the update list
# Remove from insert list
entries_to_update.append(entries_to_insert.pop(index))
break;
index = index + 1
# Completed lists get sqlalchemy to handle the operations
# If any items left in entries insert them
if jsonDump:
with open("json/" + model.__tablename__ + "_entries_insert.json", "w") as f:
json.dump(entries_to_insert, default=str, fp=f)
with open("json/" + model.__tablename__ + "_entries_update.json", "w") as f:
json.dump(entries_to_update, default=str, fp=f)
if len(entries_to_insert) > 0:
log.info("Handing over to sqlalchemy to INSERT " + str(len(entries_to_insert)) + " records")
session.execute(insert(model), entries_to_insert)
# Update items if exist
if len(entries_to_update) > 0:
log.info("Handing over to sqlalchemy to UPDATE " + str(len(entries_to_update)) + " records")
session.execute(update(model), entries_to_update)
# Commit DB
log.info("Issuing Database Commit")
session.commit()
log.info("UpSert Complete")