I'm trying to use a asyncio.Queue to implement a producer - consumer pattern. But I found out that
It makes an error.
My code
class ProducerConsumer:
"""
Producer-Consumer pattern using asyncio.Queue.
"""
def __init__(self, generator: Generator, consumer, max_queue_size=0, adaptor=None, adaptor_input=None):
self.generator = generator
self.consumer = consumer
self.adaptor = adaptor
self.adaptor_input = adaptor_input
self.max_queue_size = max_queue_size
self.queue = asyncio.Queue(maxsize=max_queue_size)
async def _produce(self):
for item in self.generator:
await self.queue.put(item)
print(f"Produced {item}")
await self.queue.put(None)
async def _consume(self):
while True:
item = await self.queue.get()
if self.adaptor is not None:
item = self.adaptor(item, self.adaptor_input)
if item is None:
break
self.consumer(item)
print(f"Consumed {item}")
async def _run(self):
await asyncio.gather(self._produce(), self._consume())
def run(self):
asyncio.run(self._run())
def my_generator():
for i in range(10):
time.sleep(0.1)
yield i
my_consumer = print
ProducerConsumer(my_generator(), my_consumer, max_queue_size=3).run()
The error
File "/Users/.../async_producer_consumer.py", line 22, in _produce
await self.queue.put(item)
File "/Users/.../.pyenv/versions/3.8.16/lib/python3.8/asyncio/queues.py", line 125, in put
await putter
RuntimeError: Task <Task pending name='Task-2' coro=<ProducerConsumer._produce() running at /Users/.../async_producer_consumer.py:22> cb=[gather.<locals>._done_callback() at /Users/.../.pyenv/versions/3.8.16/lib/python3.8/asyncio/tasks.py:769]> got Future <Future pending> attached to a different loop
I found out that this is because I have created the queue out side the asyncio.run() so two co-existing event_loops are making an issue.
My question is,
Thanks a lot for reading my question.
I can't find related documentation stating something like "be careful creating a Queue without an event loop", but I remember having had a similar problem with the Queue
in my project. My answer is based mainly on the asyncio code itself.
In versions up to 3.9, the Queue.__init__
was accepting a loop=
parameter. If the loop was not set, it called get_event_loop()
with the side effect of creating a new one if a current loop did not exist.
This did not go well with the asyncio.run
which is clearly documented as "always creates a new event loop and closes it at the end.". The result were two loops and an error.
A workaround is to use the lower level entry point run_until_complete
(but it does not implement a cleanup like 'run' does):
def run(self):
# asyncio.run(self._run())
asyncio.get_event_loop().run_until_complete(self._run())
Starting from 3.10, the previously deprecated loop=
parameter was finally removed. The Queue.__init__
was completely rewritten and the issue is fixed since then.