With the following code, I want to show how to synchronize with a thread.
Below, you find the code for that. It gets stuck waiting for the first image.
Why is the notify_all not releasing the image_created.wait?
# Output
create new image
waiting for new image
start waiter
notify_all
wait for someone to take it
waiting for image_created
import asyncio
import random
import threading
import time
class ImageUpdater:
def __init__(self):
self.image = None
self.image_used = threading.Event()
self.image_created = threading.Condition()
def update_image(self):
while True:
self.image_used.clear()
with self.image_created:
print("create new image")
time.sleep(0.6)
self.image = str(random.random())
print("notify_all")
self.image_created.notify_all()
print("wait for someone to take it")
self.image_used.wait()
print("someone took it")
async def image_generator(self):
def waiter():
print("start waiter")
time.sleep(0.1)
with self.image_created:
print("waiting for image_created")
self.image_created.wait()
print("waiter finished")
self.image_used.set()
while True:
print("waiting for new image")
await asyncio.to_thread(waiter)
yield self.image
async def main():
updater = ImageUpdater()
update_thread = threading.Thread(target=updater.update_image)
update_thread.start()
async for image in updater.image_generator():
print(f"Received new image: {image}")
if __name__ == "__main__":
loop = asyncio.run(main())
In your function waiter
, the __enter__
method of the context manager (with self.image_created
) acquires self.image_created
's lock. When you call wait
the lock is released, and the thread will wait there until the other thread calls self.image_created.notify
. That never happens because the other thread is blocked, waiting for self.image_used
to be set. The two threads are deadlocked.
The context manager for self.image_created
acquires the lock on entry to the with:
block and releases it on exit. So this code:
with self.image_created:
self.image_created.wait()
is basically equivalent to:
self.image_created.acquire()
# when you reach here, this thread owns the lock
self.image_created.wait() # releases the lock
# waits for another thread to call notify
# You will never get this far since the other thread is blocked
self.image_created.release()
Instead of calling wait inside with self.image_created
, you could just process the image and call self.image_process.set()
. That eliminates the call to self.image_process.wait
, which is causing the deadlock. But in that case you aren't using the functionality of the Condition object at all - you're just using it like another threading.Event.
This simpler program, which uses two Event objects to insure that the threads take turns, works:
import asyncio
import random
import threading
import time
class ImageUpdater:
def __init__(self):
self.image = None
self.image_used = threading.Event()
self.image_created = threading.Event()
def update_image(self):
while True:
self.image_used.clear()
print("create new image")
time.sleep(0.6)
self.image = str(random.random())
print("new image created")
self.image_created.set()
print("wait for someone to take it")
self.image_used.wait()
print("someone took it")
async def image_generator(self):
def waiter():
print("start waiter")
time.sleep(0.1)
print("waiting for image_created")
self.image_created.wait()
self.image_created.clear()
time.sleep(0.3)
self.image_used.set()
while True:
print("waiting for new image")
await asyncio.to_thread(waiter)
yield self.image
async def main():
updater = ImageUpdater()
update_thread = threading.Thread(target=updater.update_image)
update_thread.start()
async for image in updater.image_generator():
print(f"Received new image: {image}")
if __name__ == "__main__":
loop = asyncio.run(main())
You said that you eventually wanted to have more than one thread processing the images, so perhaps a producer-consumer architecture would be cleaner in the long run. See quamrana's answer.