Please, look at the code bellow (for sake of simplicity I am not using pydantic to group corutine, retries, timeouts):
import asyncio
import typing as tp
import random
async def my_func(wait_time: int) -> str:
random_number = random.random()
random_time = wait_time - random_number if random.random() < 0.5 else wait_time + random_number
print(f"waiting for {wait_time}{random_time:+} seconds")
await asyncio.sleep(wait_time)
return f"waited for {wait_time}{random_time:+} seconds"
async def main() -> None:
task1 = asyncio.create_task(my_func(wait_time=1), name='task1')
task2 = asyncio.create_task(my_func(wait_time=2), name='task2')
task3 = asyncio.create_task(my_func(wait_time=3), name='task3')
task1_timeout = 1.2
task2_timeout = 2.2
task3_timeout = 3.2
task1_retry = 4
task2_retry = 3
task3_retry = 2
total_timeout = 5
<what to put here?>
return task1_result, task2_result, task3_result
asyncio.run(main())
As you can see I have function my_func (in real life I will have multiple different functions). In main() I have defined 3 tasks. Each task has its timeout and retry. For example, task1 has timeout 2 seconds and retry of 3 times.
Furthermore I have another (global) timeout, total_timeout
that presents time in which main() must complete.
For example, if task1
start running and don't get result in 1.2 seconds, we should retry it up to 4 times, so in case in which we cannot get the result at all, we are still bellow timeout_total
of 5 seconds.
For task2
that timeouts in 2.2 seconds and can be repeated 3 times, after second repeat is finished at 4.4 second, if we retry it again, it will be cut of by total_timeout
at 5th second.
For task3
if we don't complete it in the first try, we don't have enough time for second try (total_timeout
).
I would like to execute all three tasks concurrently, respecting their individual timeouts and retries, as well as total_timeout
. At the end after up to 5 seconds I will get tuple of three elements that will be str (output of my_func) or None (in case all repeats failed, or task has been cut off by total_timeout
).
So output can be (str, str, str)
, (str, None, str)
or (None, None, None)
.
Can someone provide some example code that would do what I have described?
I think this is a great question. I propose this solution which combines asyncio.gather() and asyncio.wait_for().
Here the third task is asked to wait 5 seconds with a 3.2 seconds timeout (retry 2 times), and will return None, as asyncio.TimeoutError will be raised (and caught).
import asyncio
import random
import sys
total_timeout = float(sys.argv[1]) if len(sys.argv) > 1 else 5.0
async def work_coro(wait_time: int) -> str:
random_number = random.random()
random_time = wait_time - random_number if \
random.random() < 0.5 else wait_time + random_number
if random_number > 0.7:
raise RuntimeError('Random sleep time too high')
print(f"waiting for {wait_time}{random_time:+} seconds")
await asyncio.sleep(random_time)
return f"waited for {wait_time}{random_time:+} seconds"
async def coro_trunner(wait_time: int,
retry: int,
timeout: float) -> str:
"""
Run work_coro in a controlled timing environment
:param int wait_time: How long the coroutine will sleep on each run
:param int retry: Retry count (if the coroutine times out, retry x times)
:param float timeout: Timeout for the coroutine
"""
for attempt in range(0, retry):
try:
start_time = loop.time()
print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
'spawning')
return await asyncio.wait_for(work_coro(wait_time),
timeout)
except asyncio.TimeoutError:
diff_time = loop.time() - start_time
print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
f'timeout (diff_time: {diff_time}')
continue
except asyncio.CancelledError:
print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
'cancelled')
break
except Exception as err:
# Unknown error raised in the worker: give it another chance
print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
f'error in worker: {err}')
continue
async def main() -> list:
tasks = [
asyncio.create_task(coro_trunner(1, 2, 1.2)),
asyncio.create_task(coro_trunner(2, 3, 2.2)),
asyncio.create_task(coro_trunner(5, 5, 5.2))
]
try:
gaf = asyncio.gather(*tasks)
results = await asyncio.wait_for(gaf,
total_timeout)
except (asyncio.TimeoutError,
asyncio.CancelledError,
Exception):
# Total timeout reached: get the results that are ready
# Consume the gather exception
exc = gaf.exception() # noqa
results = []
for task in tasks:
if task.done():
results.append(task.result())
else:
# We want to know when a task yields nothing
results.append(None)
task.cancel()
return results
print(f'Total timeout: {total_timeout}')
loop = asyncio.get_event_loop()
start_time = loop.time()
results = asyncio.run(main())
end_time = loop.time()
print(f'{end_time - start_time} --> {results}')