I am trying to limit the number of simultaneous async functions running using a semaphore, but I cannot get it to work. My code boils down to this:
import asyncio
async def send(i):
print(f"starting {i}")
await asyncio.sleep(4)
print(f"ending {i}")
async def helper():
async with asyncio.Semaphore(value=5):
await asyncio.gather(*[
send(1),
send(2),
send(3),
send(4),
send(5),
send(6),
send(7),
send(8),
send(9),
send(10),
])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(helper())
loop.close()
The output is:
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
starting 9
starting 10
ending 1
ending 2
ending 3
ending 4
ending 5
ending 6
ending 7
ending 8
ending 9
ending 10
I hope and expect that only 5 will run at time, however all 10 start and stop at the same time. What am I doing wrong?
Please find the working example below, feel free to ask questions:
import asyncio
async def send(i: int, semaphore: asyncio.Semaphore):
# to demonstrate that all tasks start nearly together
print(f"Hello: {i}")
# only two tasks can run code inside the block below simultaneously
async with semaphore:
print(f"starting {i}")
await asyncio.sleep(4)
print(f"ending {i}")
async def async_main():
s = asyncio.Semaphore(value=2)
await asyncio.gather(*[send(i, semaphore=s) for i in range(1, 11)])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main())
loop.close()
VERSION FROM 18.08.2023:
I see that many people are interested in how to use asyncio.Semaphore
and I decided to extend my answer.
The new version illustrates how to use the producer-consumer pattern with asyncio.Semaphore
. If you want something very simple, you are fine to use code from the original answer above. If you want a more robust solution, which allows you to limit the number of asyncio.Tasks
to work with, you can use this more robust solution.
import asyncio
from typing import List
CONSUMERS_NUMBER = 10 # workers/consumer number
TASKS_NUMBER = 20 # number of tasks to do
async def producer(tasks_to_do: List[int], q: asyncio.Queue) -> None:
print(f"Producer started working!")
for task in tasks_to_do:
await q.put(task) # put tasks to Queue
# poison pill technique
for _ in range(CONSUMERS_NUMBER):
await q.put(None) # put poison pill to all worker/consumers
print("Producer finished working!")
async def consumer(
consumer_name: str,
q: asyncio.Queue,
semaphore: asyncio.Semaphore,
) -> None:
print(f"{consumer_name} started working!")
while True:
task = await q.get()
if task is None: # stop if poison pill was received
break
print(f"{consumer_name} took {task} from queue!")
# number of tasks which could be processed simultaneously
# is limited by semaphore
async with semaphore:
print(f"{consumer_name} started working with {task}!")
await asyncio.sleep(4)
print(f"{consumer_name} finished working with {task}!")
print(f"{consumer_name} finished working!")
async def async_main() -> None:
"""Main entrypoint of async app."""
tasks = [f"TheTask#{i + 1}" for i in range(TASKS_NUMBER)]
q = asyncio.Queue(maxsize=2)
s = asyncio.Semaphore(value=2)
consumers = [
consumer(
consumer_name=f"Consumer#{i + 1}",
q=q,
semaphore=s,
) for i in range(CONSUMERS_NUMBER)
]
await asyncio.gather(producer(tasks_to_do=tasks, q=q), *consumers)
if __name__ == "__main__":
asyncio.run(async_main())