Search code examples
postgresqlsqlalchemy

sqlalchemy listens_for() doesn't trigger


I have working listens_for function, which inserts LoadHistory object before updating Load:

@event.listens_for(Load, 'before_update')
def before_update_load(mapper, connection, target):
    current_load_select = select(load_fields).where(Load.id == target.id)
    insert_history = insert(LoadHistory).from_select(load_history_fields, current_load_select)
    connection.execute(insert_history)

And the task is to have similar function but which will insert LoadStopHistory objects after LoadHistory object has been inserted. The code is:

@event.listens_for(LoadHistory, 'after_insert')
def after_insert_load_history(mapper, connection, target):
    # here will be code
    print("@" * 125)
    print('trgt', target)
    print("@" * 125)

but the issue that this function has never been called. I've tried to insert LoadHistory with updating Load or directly with Postgres, neither practice had any result. I've tried with 'after_insert' changed to 'before_insert' and 'before_update'.


Solution

  • The impression I get from this note (from Mapper-level Flush Events):

    It is important to note that these events apply only to the session flush operation , and not to the ORM-level INSERT/UPDATE/DELETE functionality described at ORM-Enabled INSERT, UPDATE, and DELETE statements. To intercept ORM-level DML, use the SessionEvents.do_orm_execute() event.

    is that the after_insert event will not register the result of calling insert() directly - the do_orm_execute listener must be used instead. Moreover, the insert should be executed using the session rather than the connection to be registered by the ORM event machinery.

    These listeners seem to work:

    import sqlalchemy as sa
    
    ...
    
    @sa.event.listens_for(Load, 'before_update')
    def before_update_load(mapper, connection, target):
        insert_history = sa.insert(LoadHistory)
        session = sa.inspect(target).session
        session.execute(insert_history)
    
    @sa.event.listens_for(Session, 'do_orm_execute')
    def receive_orm_execute(orm_execute_state):
        if (
            orm_execute_state.is_insert
            and orm_execute_state.statement.table.name
            == LoadHistory.__table__.name
        ):
            print('*' * 10)