Search code examples
pythonpython-asynciosemaphore

Using a semaphore with asyncio in Python


I am trying to limit the number of simultaneous async functions running using a semaphore, but I cannot get it to work. My code boils down to this:

import asyncio


async def send(i):

    print(f"starting {i}")
    await asyncio.sleep(4)
    print(f"ending {i}")


async def helper():
    async with asyncio.Semaphore(value=5):
        await asyncio.gather(*[
            send(1),
            send(2),
            send(3),
            send(4),
            send(5),
            send(6),
            send(7),
            send(8),
            send(9),
            send(10),
        ])


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(helper())
    loop.close()

The output is:

starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
starting 9
starting 10
ending 1
ending 2
ending 3
ending 4
ending 5
ending 6
ending 7
ending 8
ending 9
ending 10

I hope and expect that only 5 will run at time, however all 10 start and stop at the same time. What am I doing wrong?


Solution

  • Please find the working example below, feel free to ask questions:

    import asyncio
    
    
    async def send(i: int, semaphore: asyncio.Semaphore):
        # to demonstrate that all tasks start nearly together
        print(f"Hello: {i}")
        # only two tasks can run code inside the block below simultaneously
        async with semaphore:
            print(f"starting {i}")
            await asyncio.sleep(4)
            print(f"ending {i}")
    
    
    async def async_main():
        s = asyncio.Semaphore(value=2)
        await asyncio.gather(*[send(i, semaphore=s) for i in range(1, 11)])
    
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(async_main())
        loop.close()
    

    VERSION FROM 18.08.2023:

    I see that many people are interested in how to use asyncio.Semaphore and I decided to extend my answer.

    The new version illustrates how to use the producer-consumer pattern with asyncio.Semaphore. If you want something very simple, you are fine to use code from the original answer above. If you want a more robust solution, which allows you to limit the number of asyncio.Tasks to work with, you can use this more robust solution.

    import asyncio
    from typing import List
    
    CONSUMERS_NUMBER = 10  # workers/consumer number
    TASKS_NUMBER = 20  # number of tasks to do
    
    
    async def producer(tasks_to_do: List[int], q: asyncio.Queue) -> None:
        print(f"Producer started working!")
        for task in tasks_to_do:
            await q.put(task)  # put tasks to Queue
    
        # poison pill technique
        for _ in range(CONSUMERS_NUMBER):
            await q.put(None)  # put poison pill to all worker/consumers
    
        print("Producer finished working!")
    
    
    async def consumer(
            consumer_name: str,
            q: asyncio.Queue,
            semaphore: asyncio.Semaphore,
    ) -> None:
        print(f"{consumer_name} started working!")
        while True:
            task = await q.get()
    
            if task is None:  # stop if poison pill was received
                break
    
            print(f"{consumer_name} took {task} from queue!")
    
            # number of tasks which could be processed simultaneously
            # is limited by semaphore
            async with semaphore:
                print(f"{consumer_name} started working with {task}!")
                await asyncio.sleep(4)
                print(f"{consumer_name} finished working with {task}!")
    
        print(f"{consumer_name} finished working!")
    
    
    async def async_main() -> None:
        """Main entrypoint of async app."""
        tasks = [f"TheTask#{i + 1}" for i in range(TASKS_NUMBER)]
        q = asyncio.Queue(maxsize=2)
        s = asyncio.Semaphore(value=2)
        consumers = [
            consumer(
                consumer_name=f"Consumer#{i + 1}",
                q=q,
                semaphore=s,
            ) for i in range(CONSUMERS_NUMBER)
        ]
        await asyncio.gather(producer(tasks_to_do=tasks, q=q), *consumers)
    
    
    if __name__ == "__main__":
        asyncio.run(async_main())