Search code examples
pythonmysqlpyspark

error while updating table in MySQL : mysql.connector.errors.DatabaseError: 1412 (HY000): Table definition has changed, please retry transaction


I am working on one personal project. There is multiple similar questions on this forum however everyone is case specific. In my case i am creating and loading temp table, i am joining that table with existing table in DB and updating the same table based on temp table results. while executing db.execute_insert_update() it throws an exception : mysql.connector.errors.DatabaseError: 1412 (HY000): Table definition has changed, please retry transaction

 >  def _create_and_load_table(self, dataframe: DataFrame) -> str: #first_function
    self._log_entry()
    temp_table_name = self._generate_table_name()
    table_schema = "partner_staging"
    table_with_schema = f"{table_schema}.{temp_table_name}"

    self.df_mgr.jdbc_write(dataframe=dataframe, dbtable=table_with_schema, 
   check_count_before_write=True)
    self._log_exit()
    return temp_table_name

def reconcile_partner_payments(self, dataframe: DataFrame) -> NoReturn: #second function
    self.logger.info("Start reconcile_partner_payments")
    self.logger.info(f"df_col:{dataframe.columns}")
    partner_payment_ids = dataframe.select("partner_payment_id", 
    "effective_payroll_date").distinct()
    self.logger.info(f"head:{partner_payment_ids.head(3)}")
    self.logger.info(f"count:{partner_payment_ids.count()}")
    self.logger.info(f"c:{partner_payment_ids.columns}")
    temp_table_name = self._create_and_load_table(partner_payment_ids)
    table_schema = "partner_staging"
    update_query = """UPDATE abc.partner_payment as pp
                      JOIN partner_staging.{temp_table_name} as t on pp.id = 
           t.partner_payment_id
                      SET pp.reconciled = 1, pp.modify_ts = NOW() , pp.modify_user = 'ok 
                    successfull'"""
  

    self.db.execute_insert_update(query_sql=update_query)
    self.db.drop_table(table_schema, temp_table_name)
    self.logger.info("End reconcile_partner_payments")


    def execute_insert_update(self, query_sql, args=()) -> None: #method being used in second_function
    
    try:
        cursor = self.get_cursor()
        self.start_transaction()
        cursor.execute(query_sql, args)
        self._connection.handle_unread_result()
        self.commit()
    except Error as e:
        stack = traceback.format_exc()
        self.rollback()
        print(stack)
        logger_func = Cache()[keys.LOGGER].info if keys.LOGGER in Cache() else print
        logger_func(f"stack_: {stack}")
        raise e
    finally:
        self.close_cursor()

 >  def _generate_table_name(self):#generate unique name
    self.logger.info("Start _generate_table_name")
    rand_str = random_str(length=4)
    self.logger.info("End _generate_table_name")
    return f"temp_{self.partner_name}_{self.company_name}_{self.payroll_name}_{rand_str}"

i tried defining the schema for the temp table however it is the same error.


Solution

  • It looks to my like you're running more than one instance of this program you showed us at once. And, I guess the second instance starts out by creating a table that you think is temporary, but is actually a permanent table with the same exact name as the first instance's. The second program does a CREATE TABLE while the first one is using the table, and confusion ensues, and the DBMS says "I give up! I don't know what you want!"

    How to fix this?

    1. Generate a random or otherwise unique name for the temporary table each time you run the program, or ...

    2. Make sure your ORM uses CREATE TEMPORARY TABLE tablename when it creates that temporary table. Temporary table names are scoped to the connection as well as to the schema, so you can use the same name from different instances.