Search code examples
pythonsqlitepeewee

bulk_create() of two linked tables on SQLite using peewee : primary key not updated


I have two tables with a 1-n relationship and I would like to insert data in these tables using bulk_create().

Let's take the User and Tweet example.

from peewee import *

db = SqliteDatabase('my_app.db')

class BaseModel(Model):
    class Meta:
        database = db

class User(BaseModel):
    username = CharField(unique=True)

class Tweet(BaseModel):
    user = ForeignKeyField(User, backref='tweets')
    message = TextField()

I would like to create unsaved instances of User and Tweet and to load them with bulk_create(). A naive solution would be:

db.create_tables([User, Tweet])

john = User(username='John')
mehdi = User(username='Mehdi')
users = [john, mehdi]

tweets = [Tweet(user=john, message='Hello twitter world!'),
          Tweet(user=mehdi, message='This is my first message.'),
          Tweet(user=mehdi, message='This is my second message.')]

User.bulk_create(users)
Tweet.bulk_create(tweets)

Unfortunately, this does not work because the User instances primary keys are not updated (as stated in the documentation except for Postgres databases).

As it seems impossible to update the primary key of the User instances (even if it were possible, it would probably be very inefficient to read the primary keys from the database), the only solution I can see is to use my own primary key and set it when creating the instance. This would mean not using the very convenient auto-incrementing primary key system of peewee and I would like too know if there is any alternative before to go that way.


Solution

  • This is what I came out with. I would have preferred to avoid messing with peewee's internals but it works fine.

    from peewee import *
    
    db = SqliteDatabase('my_app.db')
    
    
    class BaseModel(Model):
        id_counter = 0
        id = IntegerField(primary_key=True, constraints=[SQL('AUTOINCREMENT')])
    
        def _set_id(self):
            if self.id is None:
                self.id = self.__class__.id_counter
                self.__class__.id_counter += 1
    
        def save(self, *args, **kwargs):
            return super(BaseModel, self).save(*args, **kwargs)
    
        @classmethod
        def bulk_create(cls, model_list, batch_size=None):
            max_id = cls.select(fn.MAX(cls.id)).scalar() or 0
            cls.id_counter = max_id + 1
            for model in model_list:
                model._set_id()
                model._update_fks()
            return super(BaseModel, cls).bulk_create(model_list=model_list, batch_size=batch_size)
    
        def _update_fks(self):
            for field_name, field in self._meta.fields.items():
                if isinstance(field, ForeignKeyField):
                    fk_field_name = field_name + '_id'
                    fk_id = getattr(self, field_name).id
                    setattr(self, fk_field_name, fk_id)
    
        class Meta:
            database = db
    
    
    class User(BaseModel):
        username = CharField(unique=True)
    
    
    class Tweet(BaseModel):
        user = ForeignKeyField(User, backref='tweets', field='id')
        message = TextField()
    
    
    db.create_tables([User, Tweet])
    
    # Creating users and tweets one by one
    sarah = User.create(username='Sarah')
    Tweet.create(user=sarah, message='First tweet in history')
    
    # Bulk user and tweet insertion
    john = User(username='John')
    mehdi = User(username='Mehdi')
    users = [john, mehdi]
    
    tweets = [Tweet(user=john, message='Hello twitter world!'),
              Tweet(user=mehdi, message='This is my first message.'),
              Tweet(user=mehdi, message='This is my second message.')]
    
    User.bulk_create(users)
    Tweet.bulk_create(tweets)
    
    # Creating users and tweets one by one after bulk insertion
    miranda = User.create(username='Miranda')
    
    Tweet.create(user=miranda, message='The last tweet')