I have been writing a Python app, where:
async
function producer
which listens incoming items
over a websocket, and put these items
into an queue = asyncio.Queue()
.async
function consumer
which does queue.get()
, and query item_details
over a different websocket connection.The problem: The average speed that putting incoming items
into the queue
is much higher than the speed of the consumer
to get
items from the queue
, and as a result the queue
is piling up after a while.
Question: What is the proper way to scale-up consumer
without multi-processing and without throttling the incoming connection? I am not yet very proficient with asyncio
and threading
. I thought of running consumer
in separate workers, but as far as I understand asyncio's run_in_executor
cannot be used on async
functions, and there is also this thing that asyncio.Queue()
not being thread-safe.
In case if the consumer does an IO-bound work, you could just scale its count. And you don't care about multi-threading, because asyncio
based on idea of non-blocking IO, and designed to work in a single thread. You could or even must use threads to process blocking IO, if there is no native asynchronous alternative e.g. for file-IO, but it's a separate story.
Here is the simple example to illustrate the case when producer creates tasks faster than single consumer could process them. I emulate an IO-workload with asyncio.sleep
.
import asyncio
import itertools
async def producer(queue: asyncio.Queue):
"""producer emulator, creates ~ 10 tasks per second"""
sleep_seconds=0.1
counter = itertools.count(1)
while True:
await queue.put(next(counter))
await asyncio.sleep(sleep_seconds)
async def consumer(queue: asyncio.Queue, index):
"""slow io-bound consumer emulator, process ~ 5 tasks per second"""
sleep_seconds=0.2
while True:
task = await queue.get()
print(f"consumer={index}, task={task}, queue_size={queue.qsize()}")
await asyncio.sleep(sleep_seconds)
async def main():
q = asyncio.Queue()
concurrency = 2 # consumers count
tasks = [asyncio.create_task(consumer(q, i)) for i in range(concurrency)]
tasks += [asyncio.create_task(producer(q))]
await asyncio.wait(tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
output for single consumer, queue size constantly grows
consumer=0, task=1, queue_size=0
consumer=0, task=2, queue_size=0
consumer=0, task=3, queue_size=1
consumer=0, task=4, queue_size=2
consumer=0, task=5, queue_size=3
consumer=0, task=6, queue_size=4
consumer=0, task=7, queue_size=5
consumer=0, task=8, queue_size=6
consumer=0, task=9, queue_size=7
consumer=0, task=10, queue_size=8
output for two consumers, queue is empty
consumer=0, task=1, queue_size=0
consumer=1, task=2, queue_size=0
consumer=0, task=3, queue_size=0
consumer=1, task=4, queue_size=0
consumer=0, task=5, queue_size=0
consumer=1, task=6, queue_size=0
consumer=0, task=7, queue_size=0
consumer=1, task=8, queue_size=0
consumer=0, task=9, queue_size=0
consumer=1, task=10, queue_size=0