Search code examples

SQLAlchemy MSSQL bulk upSert

I've been trying various methods to bulk upSert an Azure SQL (MSSQL) database using SQLAlchemy 2.0, the source table is fairly large 2M records and I need to bulk upSert 100,000 records (most of which won't be there).

NOTE This will run as an Azure function so if there is a better way I'm open to this

class issues(Base):
__tablename__ = "issues"
id = mapped_column('id', String(36), primary_key=True)
created = mapped_column ('created', DateTime())
updated = mapped_column ('updated', DateTime())
status = mapped_column('status', String(50))
severity = mapped_column('severity', String(10))
control_id = mapped_column('control_id', String(36))
entity_id = mapped_column('entity_id', String(36))

Example data

issueList = {
        issues( "1234",, , "Test", "Low8", "con123", "ent123"),
        issues( "5678",, , "Test", "Low9", "con123", "ent123"),

Currently I'm doing session.merge(issue) but it's slow and doesn't support bulk inserts, I've looked at but have been getting errors as I was passing:

issueList = {
    "1234": { id: "1234", "created":, "updated":, "status": "Test", "severity": "Low16", "control_id": "con123", "entity_id": "ent123" },
    "5678": { id: "5678", "created":, "updated":, "status": "Test", "severity": "Low9", "control_id": "con123", "entity_id": "ent123" },
upsert_data (session, issueList, "issues", "id")

It seems to be expecting a model not text for the 3rd params, so I wasn't sure what to send.

Any suggestions of a fast model would be great. Only this application will be inserting data so locking the db isn't an issue as long as the lock is cleared on error.



  • I ended up having writing my own function in the end:

    import json
    import logging
    log = logging.getLogger(__name__)

    Make sure the model is defined as this will need to be passed, entities are a list of dictionaries (make sure the dictionary keys match your database field names).

    Function, with logging an optional json dump (remove as needed)

    def upsert_data(session, entries, model, key):
    batch_size = 1000
    if batch_size > len(entries):
        batch_size = len(entries)
    if jsonDump:
      with open("json/" + model.__tablename__ + "_entries_preprocess.json", "w") as f:
          json.dump(entries, default=str, fp=f)
    modelKey = getattr(model, key)
    for i in range(0, len(entries), batch_size):
   "Working Batch " + str(i) + "-" + str(i + batch_size))
        # Get the next batch
        batch = entries[i:i + batch_size]
        entries_to_update = []
        entries_to_insert = batch
        # Extract keys from batch
        keysinbatch = [entry.get(key) for entry in batch]
        existing_records = session.query(modelKey).filter(modelKey.in_(keysinbatch)).all()
        # Iterate results
        for entry in existing_records:
            # Process this batch
            dbIndex = getattr(entry, key)
            index = 0
            for x in entries_to_insert:
                if dbIndex == x[key]:
                    # Matches item in DB, move this item to the update list
                    # Remove from insert list
                index = index + 1
        # Completed lists get sqlalchemy to handle the operations
        # If any items left in entries insert them
        if jsonDump:
          with open("json/" + model.__tablename__ + "_entries_insert.json", "w") as f:
              json.dump(entries_to_insert, default=str, fp=f)
          with open("json/" + model.__tablename__ + "_entries_update.json", "w") as f:
              json.dump(entries_to_update, default=str, fp=f)
        if len(entries_to_insert) > 0:
  "Handing over to sqlalchemy to INSERT " + str(len(entries_to_insert)) + " records")
            session.execute(insert(model), entries_to_insert)
        # Update items if exist
        if len(entries_to_update) > 0:
  "Handing over to sqlalchemy to UPDATE " + str(len(entries_to_update)) + " records")
            session.execute(update(model), entries_to_update)
        # Commit DB"Issuing Database Commit")
        session.commit()"UpSert Complete")