Search code examples
pythonsql-serversqlalchemyazure-functionsazure-sql-database

SQLAlchemy MSSQL bulk upSert


I've been trying various methods to bulk upSert an Azure SQL (MSSQL) database using SQLAlchemy 2.0, the source table is fairly large 2M records and I need to bulk upSert 100,000 records (most of which won't be there).

NOTE This will run as an Azure function so if there is a better way I'm open to this

class issues(Base):
__tablename__ = "issues"
id = mapped_column('id', String(36), primary_key=True)
created = mapped_column ('created', DateTime())
updated = mapped_column ('updated', DateTime())
status = mapped_column('status', String(50))
severity = mapped_column('severity', String(10))
control_id = mapped_column('control_id', String(36))
entity_id = mapped_column('entity_id', String(36))

Example data

issueList = {
        issues( "1234", datetime.now(), datetime.now() , "Test", "Low8", "con123", "ent123"),
        issues( "5678", datetime.now(), datetime.now() , "Test", "Low9", "con123", "ent123"),
}

Currently I'm doing session.merge(issue) but it's slow and doesn't support bulk inserts, I've looked at https://stackoverflow.com/a/69968892/1697288 but have been getting errors as I was passing:

issueList = {
    "1234": { id: "1234", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low16", "control_id": "con123", "entity_id": "ent123" },
    "5678": { id: "5678", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low9", "control_id": "con123", "entity_id": "ent123" },
}
upsert_data (session, issueList, "issues", "id")

It seems to be expecting a model not text for the 3rd params, so I wasn't sure what to send.

Any suggestions of a fast model would be great. Only this application will be inserting data so locking the db isn't an issue as long as the lock is cleared on error.

Thanks.


Solution

  • I ended up having writing my own function in the end:

    import json
    import logging
    log = logging.getLogger(__name__)
    

    Make sure the model is defined as this will need to be passed, entities are a list of dictionaries (make sure the dictionary keys match your database field names).

    Function, with logging an optional json dump (remove as needed)

    def upsert_data(session, entries, model, key):
    batch_size = 1000
    if batch_size > len(entries):
        batch_size = len(entries)
    
    if jsonDump:
      with open("json/" + model.__tablename__ + "_entries_preprocess.json", "w") as f:
          json.dump(entries, default=str, fp=f)
    
    modelKey = getattr(model, key)
    
    for i in range(0, len(entries), batch_size):
    
        log.info("Working Batch " + str(i) + "-" + str(i + batch_size))
        # Get the next batch
        batch = entries[i:i + batch_size]
    
        entries_to_update = []
        entries_to_insert = batch
    
        # Extract keys from batch
        keysinbatch = [entry.get(key) for entry in batch]
    
        existing_records = session.query(modelKey).filter(modelKey.in_(keysinbatch)).all()
    
        # Iterate results
        for entry in existing_records:
            # Process this batch
            dbIndex = getattr(entry, key)
            index = 0
            for x in entries_to_insert:
                if dbIndex == x[key]:
                    # Matches item in DB, move this item to the update list
                    # Remove from insert list
                    entries_to_update.append(entries_to_insert.pop(index))
                    break;
                index = index + 1
    
        # Completed lists get sqlalchemy to handle the operations
        # If any items left in entries insert them
    
        if jsonDump:
          with open("json/" + model.__tablename__ + "_entries_insert.json", "w") as f:
              json.dump(entries_to_insert, default=str, fp=f)
          
          with open("json/" + model.__tablename__ + "_entries_update.json", "w") as f:
              json.dump(entries_to_update, default=str, fp=f)
    
        if len(entries_to_insert) > 0:
            log.info("Handing over to sqlalchemy to INSERT " + str(len(entries_to_insert)) + " records")
            session.execute(insert(model), entries_to_insert)
    
        # Update items if exist
        if len(entries_to_update) > 0:
            log.info("Handing over to sqlalchemy to UPDATE " + str(len(entries_to_update)) + " records")
            session.execute(update(model), entries_to_update)
    
        # Commit DB
        log.info("Issuing Database Commit")
        session.commit()
    
    log.info("UpSert Complete")