Search code examples
pythonpython-3.xqueuepython-asynciocoroutine

asyncio.Queue Stuck With 1 Coroutine Adding to Queue, 1 Coroutine Getting from Queue


In my simple asyncio code below, the app has one task self.add_item_loop_task continuously adding an integer to the asyncio.Queue named self.queue, while a second task self.get_item_loop_task continuously waits for something to be added to the queue and print it out.

However, this app only prints out 0 once when I run it, and gets stuck there. I believe the loop in self.get_item_loop_task is not proceeding. Why is this happening?

import asyncio

class App:
    def __init__(self):
        self.queue = asyncio.Queue()

    async def start(self):
        self.add_item_loop_task = asyncio.create_task(self.add_item_loop())
        self.get_item_loop_task = asyncio.create_task(self.get_item_loop())
        await asyncio.wait(
            [
                self.add_item_loop_task,
                self.get_item_loop_task,
            ]
        )

    async def stop(self):
        self.add_item_loop_task.cancel()
        self.get_item_loop_task.cancel()

    async def add_item_loop(self):
        i = 0
        while True:
            await self.queue.put(i)
            i += 1
            await asyncio.sleep(1)

    async def get_item_loop(self):
        while True:
            item = await self.queue.get()
            print(item)


app = App()
try:
    asyncio.run(app.start())
except KeyboardInterrupt:
    asyncio.run(app.stop())

Solution

  • This is caused by some dubious implementation details of asyncio. When you say self.queue = asyncio.Queue() this will actually create an event loop if one does not already exist. Meanwhile, when you call asyncio.run() it will always create a new event loop. This means if you create a queue before you call asyncio.run() you can get some strange behavior because there are two event loops, the one that your queue uses and the one that asyncio.run is using.

    You can fix this issue by moving the creation of App into a coroutine function that you pass in to asyncio.run() like below. Doing this your application works as intended.

    async def main():
        app = App()
        await app.start()
    
    asyncio.run(main())