Search code examples
pythonpython-asynciopython-multithreading

Why is threading.Condition.notfiy_all not trigger that a waiting thread is continued?


With the following code, I want to show how to synchronize with a thread.

  • I want to have a separate thread that updates an image.
  • From these images, I want to have an asynchronous generator.
  • The images should only be updated when the asynchronous generator used it.
  • The async generator should be waiting for a new image to be created.

Below, you find the code for that. It gets stuck waiting for the first image.

Why is the notify_all not releasing the image_created.wait?

# Output
create new image
waiting for new image
start waiter
notify_all
wait for someone to take it
waiting for image_created
import asyncio
import random
import threading
import time


class ImageUpdater:
    def __init__(self):
        self.image = None
        self.image_used = threading.Event()
        self.image_created = threading.Condition()

    def update_image(self):
        while True:
            self.image_used.clear()
            with self.image_created:
                print("create new image")
                time.sleep(0.6)
                self.image = str(random.random())
                print("notify_all")
                self.image_created.notify_all()
            print("wait for someone to take it")
            self.image_used.wait()
            print("someone took it")

    async def image_generator(self):
        def waiter():
            print("start waiter")
            time.sleep(0.1)
            with self.image_created:
                print("waiting for image_created")
                self.image_created.wait()
            print("waiter finished")
            self.image_used.set()

        while True:
            print("waiting for new image")
            await asyncio.to_thread(waiter)

            yield self.image


async def main():
    updater = ImageUpdater()

    update_thread = threading.Thread(target=updater.update_image)
    update_thread.start()

    async for image in updater.image_generator():
        print(f"Received new image: {image}")


if __name__ == "__main__":
    loop = asyncio.run(main())


Solution

  • In your function waiter, the __enter__ method of the context manager (with self.image_created) acquires self.image_created's lock. When you call wait the lock is released, and the thread will wait there until the other thread calls self.image_created.notify. That never happens because the other thread is blocked, waiting for self.image_used to be set. The two threads are deadlocked.

    The context manager for self.image_created acquires the lock on entry to the with: block and releases it on exit. So this code:

    with self.image_created:
        self.image_created.wait()
    

    is basically equivalent to:

    self.image_created.acquire()  
    # when you reach here, this thread owns the lock
    self.image_created.wait()  # releases the lock
                               # waits for another thread to call notify
    # You will never get this far since the other thread is blocked
    self.image_created.release()
    

    Instead of calling wait inside with self.image_created, you could just process the image and call self.image_process.set(). That eliminates the call to self.image_process.wait, which is causing the deadlock. But in that case you aren't using the functionality of the Condition object at all - you're just using it like another threading.Event.

    This simpler program, which uses two Event objects to insure that the threads take turns, works:

    import asyncio
    import random
    import threading
    import time
    
    
    class ImageUpdater:
        def __init__(self):
            self.image = None
            self.image_used = threading.Event()
            self.image_created = threading.Event()
    
        def update_image(self):
            while True:
                self.image_used.clear()
                print("create new image")
                time.sleep(0.6)
                self.image = str(random.random())
                print("new image created")
                self.image_created.set()
                print("wait for someone to take it")
                self.image_used.wait()
                print("someone took it")
    
        async def image_generator(self):
            def waiter():
                print("start waiter")
                time.sleep(0.1)
                print("waiting for image_created")
                self.image_created.wait()
                self.image_created.clear()
                time.sleep(0.3)
                self.image_used.set()    
    
            while True:
                print("waiting for new image")
                await asyncio.to_thread(waiter)
    
                yield self.image
    
    async def main():
        updater = ImageUpdater()
    
        update_thread = threading.Thread(target=updater.update_image)
        update_thread.start()
    
        async for image in updater.image_generator():
            print(f"Received new image: {image}")
    
    
    if __name__ == "__main__":
        loop = asyncio.run(main())
    

    You said that you eventually wanted to have more than one thread processing the images, so perhaps a producer-consumer architecture would be cleaner in the long run. See quamrana's answer.