Search code examples
pythonsqlalchemy

sqlalchemy bulk_update fail


my table user_info as follow:

class UserInfo(Base):
    __tablename__ = 'user_info'

    id = Column(INTEGER, primary_key=True)
    user_code = Column(CHAR(16), nullable=False, comment='工号')
    goal_1 = Column(CHAR(16))
    goal_2 = Column(CHAR(16))
    prem_1 = Column(CHAR(16))
    prem_2 = Column(CHAR(16))

It has about 10k datas and all of the columns 'goal' and 'prem' are NULL,
I want to update these columns with random data,
for test I just update 50 rows and my code as follow:

    def insert_prem_date(self, count=50):
        users = self.db.session.query(UserInfo).limit(count)

        user_codes = [i.user_code for i in users]
        print(user_codes)

        prem_list = []
        for _code in user_codes:
            prem_list.append(
                {"user_code": _code,
                 "prem_1": str(random.randint(500, 500000)),
                 "prem_2": str(random.randint(500, 500000)),
                 "goal_1": str(random.randint(500, 500000)),
                 "goal_2": str(random.randint(500, 500000))
                 }
            )
        self.db.session.bulk_update_mappings(
            UserInfo,
            prem_list
        )

        self.db.session.commit()

And the was ERROR:

enter image description here I am sure that the user_code are exist in the table

so how can i update it correctly?

I also try this code but it doesn't work: not ERROR but also not update data in table

        for _code in user_codes:
            prem_list.append(
                {UserInfo.user_code: _code,
                 UserInfo.prem_1: str(random.randint(500, 500000)),
                 UserInfo.prem_2: str(random.randint(500, 500000)),
                 UserInfo.goal_1: str(random.randint(500, 500000)),
                 UserInfo.goal_2: str(random.randint(500, 500000))
                 }
            )

Solution

  • It seems like there's a misunderstanding in how bulk_update_mappings works.

    It is impossible to use bulk_update_mappings by field(your unique field) which is different than primary key.

    Your mapping dictionary doesn't contain primary key. Passing with a list of parameter dictionaries with a full primary key value will invoke bulk UPDATE by primary key mode for statement, generating the WHERE criteria to match each row by primary key and using executemany.

    Approach to use bulk_update_mappings

    def insert_prem_date(self, count=50):
       # Fetch users
       users = self.db.session.query(UserInfo).limit(count)
    
       # Extract user IDs
       user_ids = [user.id for user in users]
       print(user_ids)
    
       # Generate random data for each column and prepare for bulk update
       prem_list = [
           {
               "id": user_id,
               "prem_1": str(random.randint(500, 500000)),
               "prem_2": str(random.randint(500, 500000)),
               "goal_1": str(random.randint(500, 500000)),
               "goal_2": str(random.randint(500, 500000)),
           }
           for user_id in user_ids
       ]
    
       # Perform bulk update
       self.db.session.bulk_update_mappings(
           UserInfo,
           prem_list
       )
    
       # Commit the changes
       self.db.session.commit()
    

    Original method without bulk_update

    def update_prem_data(self, count=50):
        users = self.db.session.query(UserInfo).limit(count).all()
    
        for user in users:
            user.prem_1 = str(random.randint(500, 500000))
            user.prem_2 = str(random.randint(500, 500000))
            user.goal_1 = str(random.randint(500, 500000))
            user.goal_2 = str(random.randint(500, 500000))
    
        self.db.session.commit()