Search code examples
pythonmultithreadingsqlitepeewee

Peewee, SQLite and threading


I am working on a threaded application where one thread will feed a Queue with objects to be modified and a number of other threads will then read from the queue, do the modifications and save the changes.

The application won't need a lot of concurrency, so I would like to stick to an SQLite database. Here is a small example illustrating the application:

import queue
import threading
import peewee as pw

db = pw.SqliteDatabase('test.db', threadlocals=True)

class Container(pw.Model):
    contents = pw.CharField(default="spam")

    class Meta:
        database = db


class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        containers = Container.select()

        for container in containers:
            self.q.put(container)


class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context() as ctx:
                # Get a new connection to the container object:
                container = Container.get(id=item.id)
                container.contents = "eggs"
                container.save()

            self.q.task_done()


if __name__ == "__main__":

    db.connect()
    try:
        db.create_tables([Container,])
    except pw.OperationalError:
        pass
    else:
        [Container.create() for c in range(42)]
    db.close()

    q = queue.Queue(maxsize=10)


    feeder = FeederThread(q)
    feeder.setDaemon(True)
    feeder.start()

    for i in range(10):
        reader = ReaderThread(q)
        reader.setDaemon(True)
        reader.start()

    q.join()

Based on the peewee docs multi-threading should be supported for SQLite. However, I keep getting the infamous peewee.OperationalError: database is locked error with the error output pointing to the container.save() line.

How do I get around this?


Solution

  • I was kind of surprised to see this failing as well, so I copied your code and played around with some different ideas. What I think the problem is, is that ExecutionContext() by default will cause the wrapped block to run in a transaction. To avoid this, I passed in False in the reader threads.

    I also edited the feeder to consume the SELECT statement before putting stuff into the queue (list(Container.select())).

    The following works for me locally:

    class FeederThread(threading.Thread):
    
        def __init__(self, input_queue):
            super(FeederThread, self).__init__()
    
            self.q = input_queue
    
        def run(self):
            containers = list(Container.select())
    
            for container in containers:
                self.q.put(container.id)  # I don't like passing model instances around like this, personal preference though
    
    class ReaderThread(threading.Thread):
    
        def __init__(self, input_queue):
            super(ReaderThread, self).__init__()
    
            self.q = input_queue
    
        def run(self):
            while True:
                item = self.q.get()
    
                with db.execution_context(False):
                    # Get a new connection to the container object:
                    container = Container.get(id=item)
                    container.contents = "nuggets"
                    with db.atomic():
                        container.save()
    
                self.q.task_done()
    
    if __name__ == "__main__":
    
        with db.execution_context():
            try:
                db.create_tables([Container,])
            except OperationalError:
                pass
            else:
                [Container.create() for c in range(42)]
    
        # ... same ...
    

    I'm not wholly satisfied with this, but hopefully it gives you some ideas.

    Here's a blog post I wrote a while back that has some tips for getting higher concurrency with SQLite: http://charlesleifer.com/blog/sqlite-small-fast-reliable-choose-any-three-/