Search code examples
pythonmultithreadingpython-asynciopython-multithreadingproducer-consumer

Python Asyncio producer-consumer workflow congestion / growing queue


I have been writing a Python app, where:

  • There is an async function producer which listens incoming items over a websocket, and put these items into an queue = asyncio.Queue().
  • There is an async function consumer which does queue.get(), and query item_details over a different websocket connection.

The problem: The average speed that putting incoming items into the queue is much higher than the speed of the consumer to get items from the queue, and as a result the queue is piling up after a while.

Question: What is the proper way to scale-up consumer without multi-processing and without throttling the incoming connection? I am not yet very proficient with asyncio and threading. I thought of running consumer in separate workers, but as far as I understand asyncio's run_in_executor cannot be used on async functions, and there is also this thing that asyncio.Queue() not being thread-safe.


Solution

  • In case if the consumer does an IO-bound work, you could just scale its count. And you don't care about multi-threading, because asyncio based on idea of non-blocking IO, and designed to work in a single thread. You could or even must use threads to process blocking IO, if there is no native asynchronous alternative e.g. for file-IO, but it's a separate story.

    Here is the simple example to illustrate the case when producer creates tasks faster than single consumer could process them. I emulate an IO-workload with asyncio.sleep.

    import asyncio
    import itertools
    
    async def producer(queue: asyncio.Queue):
        """producer emulator, creates ~ 10 tasks per second"""
        sleep_seconds=0.1
        counter = itertools.count(1)
        while True:
            await queue.put(next(counter))
            await asyncio.sleep(sleep_seconds)
    
    
    async def consumer(queue: asyncio.Queue, index):
        """slow io-bound consumer emulator, process ~ 5 tasks per second"""
        sleep_seconds=0.2
        while True:
            task = await queue.get()
            print(f"consumer={index}, task={task}, queue_size={queue.qsize()}")
            await asyncio.sleep(sleep_seconds)
    
    
    async def main():
        q = asyncio.Queue()
        concurrency = 2  # consumers count
        tasks = [asyncio.create_task(consumer(q, i)) for i in range(concurrency)]
        tasks += [asyncio.create_task(producer(q))]
        await asyncio.wait(tasks)
    
    
    if __name__ == "__main__":
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            pass
    
    

    output for single consumer, queue size constantly grows

    consumer=0, task=1, queue_size=0
    consumer=0, task=2, queue_size=0
    consumer=0, task=3, queue_size=1
    consumer=0, task=4, queue_size=2
    consumer=0, task=5, queue_size=3
    consumer=0, task=6, queue_size=4
    consumer=0, task=7, queue_size=5
    consumer=0, task=8, queue_size=6
    consumer=0, task=9, queue_size=7
    consumer=0, task=10, queue_size=8
    

    output for two consumers, queue is empty

    consumer=0, task=1, queue_size=0
    consumer=1, task=2, queue_size=0
    consumer=0, task=3, queue_size=0
    consumer=1, task=4, queue_size=0
    consumer=0, task=5, queue_size=0
    consumer=1, task=6, queue_size=0
    consumer=0, task=7, queue_size=0
    consumer=1, task=8, queue_size=0
    consumer=0, task=9, queue_size=0
    consumer=1, task=10, queue_size=0