Search code examples
pythonmysqlsqlalchemyormupsert

Insert or update (upsert) multiple objects using ORM session


I'm trying to upsert using SQLAlchemy. There's no upsert in SQL but SQLAlchemy provides this. The same thing I'm trying to perform with SQLAlchemy ORM session. My code:

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(engine)
with Session() as session:
  """Here upsert functionality"""
  session.insert_or_update(company)
  session.commit()

session.merge(company) works as I only need to check for primary key and not other unique values. The documentation says:

Session.merge() examines the primary key attributes of the source instance, and attempts to reconcile it with an instance of the same primary key in the session. If not found locally, it attempts to load the object from the database based on primary key, and if none can be located, creates a new instance. The state of each attribute on the source instance is then copied to the target instance. The resulting target instance is then returned by the method; the original source instance is left unmodified, and un-associated with the Session if not already.

How do I perform this for multiple objects?


Solution

  • As you have noted, Session.merge() will accomplish the task on an object-by-object basis. For example, if we have

    class Thing(Base):
        __tablename__ = "thing"
        id: Mapped[int] = mapped_column(primary_key=True, autoincrement=False)
        txt: Mapped[str] = mapped_column(String(50))
        
        
    my_thing = Thing(id=1, txt="foo")
    

    we can do

    with Session(engine) as sess:
        sess.merge(my_thing)
        sess.commit()
    

    However, we don't want to do that for a large number of objects, e.g.,

    rows_to_upsert = 4000
    things = [Thing(id=i, txt=f"txt_{i}") for i in range(rows_to_upsert)]
    
    with Session(engine, autoflush=False) as sess:
        t0 = time.perf_counter()
        for thing in things:
            sess.merge(thing)
        sess.commit()
        print(
            f"merge: {rows_to_upsert:,} rows upserted in {(time.perf_counter() - t0):0.1f} seconds"
        )
    

    because that will result in 4000 round-trips to the server to perform a SELECT for each object, and that will be slow. In my testing, upserting 4000 rows took approximately 40 seconds, or about 100 rows/sec.

    Instead, we should convert the list of objects to a list of dict

    list_of_dict = [dict(id=thing.id, txt=thing.txt) for thing in things]
    
    

    and then use an INSERT … ON DUPLICATE KEY statement

    from sqlalchemy.dialects.mysql import insert
    
    insert_stmt = insert(Thing).values(list_of_dict)
    on_duplicate_stmt = insert_stmt.on_duplicate_key_update(
        dict(txt=insert_stmt.inserted.txt)
    )
    with Session(engine) as sess:
        t0 = time.perf_counter()
        sess.execute(on_duplicate_stmt)
        sess.commit()
        print(
            f"insert: {rows_to_upsert:,} rows upserted in {(time.perf_counter() - t0):0.1f} seconds"
        )
    

    which only took around 2 seconds, or about 2000 rows/sec.