Search code examples
pythonpython-3.xasync-awaitpython-asyncioevent-loop

Doesn't asyncio.run() use the running event_loop if it exists? (asyncio.Queue with producer consumer)


I'm trying to use a asyncio.Queue to implement a producer - consumer pattern. But I found out that

  1. if i create a Queue out side asyncio.run()
  2. and i set the maxsize of the queue larger than amount of items that the producer can produce

It makes an error.

My code

class ProducerConsumer:
    """
    Producer-Consumer pattern using asyncio.Queue.
    """

    def __init__(self, generator: Generator, consumer, max_queue_size=0, adaptor=None, adaptor_input=None):
        self.generator = generator
        self.consumer = consumer
        self.adaptor = adaptor
        self.adaptor_input = adaptor_input
        self.max_queue_size = max_queue_size
        self.queue = asyncio.Queue(maxsize=max_queue_size)

    async def _produce(self):
        for item in self.generator:
            await self.queue.put(item)
            print(f"Produced {item}")
        await self.queue.put(None)

    async def _consume(self):
        while True:
            item = await self.queue.get()
            if self.adaptor is not None:
                item = self.adaptor(item, self.adaptor_input)
            if item is None:
                break
            self.consumer(item)
            print(f"Consumed {item}")

    async def _run(self):
        await asyncio.gather(self._produce(), self._consume())
        
    def run(self):
        asyncio.run(self._run())
            

def my_generator():
    for i in range(10):
        time.sleep(0.1)
        yield i

my_consumer = print
ProducerConsumer(my_generator(), my_consumer, max_queue_size=3).run()

The error

  File "/Users/.../async_producer_consumer.py", line 22, in _produce
    await self.queue.put(item)
  File "/Users/.../.pyenv/versions/3.8.16/lib/python3.8/asyncio/queues.py", line 125, in put
    await putter
RuntimeError: Task <Task pending name='Task-2' coro=<ProducerConsumer._produce() running at /Users/.../async_producer_consumer.py:22> cb=[gather.<locals>._done_callback() at /Users/.../.pyenv/versions/3.8.16/lib/python3.8/asyncio/tasks.py:769]> got Future <Future pending> attached to a different loop

I found out that this is because I have created the queue out side the asyncio.run() so two co-existing event_loops are making an issue.

My question is,

  1. Doesn't the asyncio.run() use the running event_loop (which is initially made by the queue)?
  2. Why Does this error not occur if i set the maxsize of queue as infinite?

Thanks a lot for reading my question.


Solution

  • I can't find related documentation stating something like "be careful creating a Queue without an event loop", but I remember having had a similar problem with the Queue in my project. My answer is based mainly on the asyncio code itself.

    In versions up to 3.9, the Queue.__init__ was accepting a loop= parameter. If the loop was not set, it called get_event_loop() with the side effect of creating a new one if a current loop did not exist.

    This did not go well with the asyncio.run which is clearly documented as "always creates a new event loop and closes it at the end.". The result were two loops and an error.

    A workaround is to use the lower level entry point run_until_complete (but it does not implement a cleanup like 'run' does):

    def run(self):
        # asyncio.run(self._run())
        asyncio.get_event_loop().run_until_complete(self._run())
            
    

    Starting from 3.10, the previously deprecated loop= parameter was finally removed. The Queue.__init__ was completely rewritten and the issue is fixed since then.