I am working on a threaded application where one thread will feed a Queue
with objects to be modified and a number of other threads will then read from the queue, do the modifications and save the changes.
The application won't need a lot of concurrency, so I would like to stick to an SQLite database. Here is a small example illustrating the application:
import queue
import threading
import peewee as pw
db = pw.SqliteDatabase('test.db', threadlocals=True)
class Container(pw.Model):
contents = pw.CharField(default="spam")
class Meta:
database = db
class FeederThread(threading.Thread):
def __init__(self, input_queue):
super().__init__()
self.q = input_queue
def run(self):
containers = Container.select()
for container in containers:
self.q.put(container)
class ReaderThread(threading.Thread):
def __init__(self, input_queue):
super().__init__()
self.q = input_queue
def run(self):
while True:
item = self.q.get()
with db.execution_context() as ctx:
# Get a new connection to the container object:
container = Container.get(id=item.id)
container.contents = "eggs"
container.save()
self.q.task_done()
if __name__ == "__main__":
db.connect()
try:
db.create_tables([Container,])
except pw.OperationalError:
pass
else:
[Container.create() for c in range(42)]
db.close()
q = queue.Queue(maxsize=10)
feeder = FeederThread(q)
feeder.setDaemon(True)
feeder.start()
for i in range(10):
reader = ReaderThread(q)
reader.setDaemon(True)
reader.start()
q.join()
Based on the peewee docs multi-threading should be supported for SQLite. However, I keep getting the infamous peewee.OperationalError: database is locked
error with the error output pointing to the container.save()
line.
How do I get around this?
I was kind of surprised to see this failing as well, so I copied your code and played around with some different ideas. What I think the problem is, is that ExecutionContext()
by default will cause the wrapped block to run in a transaction. To avoid this, I passed in False
in the reader threads.
I also edited the feeder to consume the SELECT statement before putting stuff into the queue (list(Container.select())
).
The following works for me locally:
class FeederThread(threading.Thread):
def __init__(self, input_queue):
super(FeederThread, self).__init__()
self.q = input_queue
def run(self):
containers = list(Container.select())
for container in containers:
self.q.put(container.id) # I don't like passing model instances around like this, personal preference though
class ReaderThread(threading.Thread):
def __init__(self, input_queue):
super(ReaderThread, self).__init__()
self.q = input_queue
def run(self):
while True:
item = self.q.get()
with db.execution_context(False):
# Get a new connection to the container object:
container = Container.get(id=item)
container.contents = "nuggets"
with db.atomic():
container.save()
self.q.task_done()
if __name__ == "__main__":
with db.execution_context():
try:
db.create_tables([Container,])
except OperationalError:
pass
else:
[Container.create() for c in range(42)]
# ... same ...
I'm not wholly satisfied with this, but hopefully it gives you some ideas.
Here's a blog post I wrote a while back that has some tips for getting higher concurrency with SQLite: http://charlesleifer.com/blog/sqlite-small-fast-reliable-choose-any-three-/