Search code examples
pythontimeoutpython-asyncioretry-logic

Python asyncio timeout/retry design pattern


Please, look at the code bellow (for sake of simplicity I am not using pydantic to group corutine, retries, timeouts):

import asyncio
import typing as tp
import random

async def my_func(wait_time: int) -> str:
    random_number = random.random()
    random_time = wait_time - random_number if random.random() < 0.5 else wait_time + random_number
    print(f"waiting for {wait_time}{random_time:+} seconds")
    await asyncio.sleep(wait_time)
    return f"waited for {wait_time}{random_time:+} seconds"

async def main() -> None:

    task1 = asyncio.create_task(my_func(wait_time=1), name='task1')
    task2 = asyncio.create_task(my_func(wait_time=2), name='task2')
    task3 = asyncio.create_task(my_func(wait_time=3), name='task3')

    task1_timeout = 1.2
    task2_timeout = 2.2
    task3_timeout = 3.2

    task1_retry = 4
    task2_retry = 3
    task3_retry = 2

    total_timeout = 5

    <what to put here?>

    return task1_result, task2_result, task3_result

asyncio.run(main())

As you can see I have function my_func (in real life I will have multiple different functions). In main() I have defined 3 tasks. Each task has its timeout and retry. For example, task1 has timeout 2 seconds and retry of 3 times.

Furthermore I have another (global) timeout, total_timeout that presents time in which main() must complete.

For example, if task1 start running and don't get result in 1.2 seconds, we should retry it up to 4 times, so in case in which we cannot get the result at all, we are still bellow timeout_total of 5 seconds.

For task2 that timeouts in 2.2 seconds and can be repeated 3 times, after second repeat is finished at 4.4 second, if we retry it again, it will be cut of by total_timeout at 5th second.

For task3 if we don't complete it in the first try, we don't have enough time for second try (total_timeout).

I would like to execute all three tasks concurrently, respecting their individual timeouts and retries, as well as total_timeout. At the end after up to 5 seconds I will get tuple of three elements that will be str (output of my_func) or None (in case all repeats failed, or task has been cut off by total_timeout). So output can be (str, str, str), (str, None, str) or (None, None, None).

Can someone provide some example code that would do what I have described?


Solution

  • I think this is a great question. I propose this solution which combines asyncio.gather() and asyncio.wait_for().

    Here the third task is asked to wait 5 seconds with a 3.2 seconds timeout (retry 2 times), and will return None, as asyncio.TimeoutError will be raised (and caught).

    import asyncio
    import random
    import sys
    
    
    total_timeout = float(sys.argv[1]) if len(sys.argv) > 1 else 5.0
    
    
    async def work_coro(wait_time: int) -> str:
        random_number = random.random()
        random_time = wait_time - random_number if \
            random.random() < 0.5 else wait_time + random_number
    
        if random_number > 0.7:
            raise RuntimeError('Random sleep time too high')
    
        print(f"waiting for {wait_time}{random_time:+} seconds")
    
        await asyncio.sleep(random_time)
    
        return f"waited for {wait_time}{random_time:+} seconds"
    
    
    async def coro_trunner(wait_time: int,
                           retry: int,
                           timeout: float) -> str:
        """
        Run work_coro in a controlled timing environment
    
        :param int wait_time: How long the coroutine will sleep on each run
        :param int retry: Retry count (if the coroutine times out, retry x times)
        :param float timeout: Timeout for the coroutine
        """
    
        for attempt in range(0, retry):
            try:
                start_time = loop.time()
                print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
                      'spawning')
    
                return await asyncio.wait_for(work_coro(wait_time),
                                              timeout)
            except asyncio.TimeoutError:
                diff_time = loop.time() - start_time
                print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
                      f'timeout (diff_time: {diff_time}')
                continue
            except asyncio.CancelledError:
                print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
                      'cancelled')
                break
            except Exception as err:
                # Unknown error raised in the worker: give it another chance
    
                print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
                      f'error in worker: {err}')
                continue
    
    
    async def main() -> list:
        tasks = [
            asyncio.create_task(coro_trunner(1, 2, 1.2)),
            asyncio.create_task(coro_trunner(2, 3, 2.2)),
            asyncio.create_task(coro_trunner(5, 5, 5.2))
        ]
    
        try:
            gaf = asyncio.gather(*tasks)
            results = await asyncio.wait_for(gaf,
                                             total_timeout)
        except (asyncio.TimeoutError,
                asyncio.CancelledError,
                Exception):
            # Total timeout reached: get the results that are ready
    
            # Consume the gather exception
            exc = gaf.exception()  # noqa
    
            results = []
    
            for task in tasks:
                if task.done():
                    results.append(task.result())
                else:
                    # We want to know when a task yields nothing
                    results.append(None)
                    task.cancel()
    
        return results
    
    
    print(f'Total timeout: {total_timeout}')
    
    
    loop = asyncio.get_event_loop()
    start_time = loop.time()
    
    results = asyncio.run(main())
    end_time = loop.time()
    
    print(f'{end_time - start_time} --> {results}')