Search code examples
pythonsetintervalpython-asyncioaiohttp

asyncio aiohttp cancel an http request polling, return a result


I'm using this code to create an http request every 5 seconds.

async def do_request():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8000/') as resp:
            print(resp.status)
            return await resp.text()

Didn't find a bulit-in scheduler, so I wrote this function (similar to javascript):

async def set_interval(fn, seconds):
    while True:
        await fn()
        await asyncio.sleep(seconds)

And this is how I use it:

asyncio.ensure_future(set_interval(do_request, 5))

The code works fine, but I have two requirements:
1. How can I stop the set_interval after it is added to the event loop? (similar to javascript clearInterval())
2. Is it possible that set_interval will return the value of the function that it is wrapping? In this example I will want the response text. Other pattern that will do the same task will be accepted.


Solution

  • Canceling the job

    1. How can I stop the set_interval after it is added to the event loop? (similar to javascript clearInterval())

    One option is to cancel the returned task:

    # create_task is like ensure_future, but guarantees to return a task
    task = loop.create_task(set_interval(do_request, 5))
    ...
    task.cancel()
    

    This will also cancel whatever future set_interval was awaiting. If you don't want that, and want fn() to continue in the background instead, use await asyncio.shield(fn()) instead.

    Distributing values generated by the function

    1. Is it possible that set_interval will return the value of the function that it is wrapping? In this example I will want the response text. Other pattern that will do the same task will be accepted.

    Since set_interval is running in an infinite loop, it cannot return anything - a return would terminate the loop.

    Exposing values via async iteration

    If you need the values from the function, one option is to redesign set_interval as a generator. The caller would obtain the values using async for, which is quite readable, but also very different from JavaScript's setInterval:

    async def iter_interval(fn, seconds):
        while True:
            yield await fn()
            await asyncio.sleep(seconds)
    

    Usage example:

    async def x():
        print('x')
        return time.time()
    
    async def main():
        async for obj in iter_interval(x, 1):
            print('got', obj)
    
    # or asyncio.get_event_loop().run_until_complete(main())
    asyncio.run(main())
    

    Broadcasting values via a future

    Another approach is for each pass of the loop to broadcast the generated value to a global future which other coroutines can await; something along the lines of:

    next_value = None
    
    async def set_interval(fn, seconds):
        global next_value
        loop = asyncio.get_event_loop()
        while True:
            next_value = loop.create_task(fn())
            await next_value
            await asyncio.sleep(seconds)
    

    With usage like this:

    # def x() as above
    
    async def main():
        asyncio.create_task(set_interval(x, 1))
        while True:
            await asyncio.sleep(0)
            obj = await next_value
            print('got', obj)
    

    The above simple implementation has a major issue, though: Once the next value is provided, next_value is not immediately replaced by a new Future, but only after the sleep. This means that main() prints "got " in a tight loop, until a new timestamp arrives. It also means removing await asyncio.sleep(0) would actually break it, because the only await in the loop would never suspend and set_interval would no longer get a chance to run.

    This is clearly not intended. We would like the loop in main() to wait for the next value, even after an initial value is obtained. To do so, set_interval must be a bit smarter:

    next_value = None
    
    async def set_interval(fn, seconds):
        global next_value
        loop = asyncio.get_event_loop()
    
        next_value = loop.create_task(fn())
        await next_value
    
        async def iteration_pass():
            await asyncio.sleep(seconds)
            return await fn()
    
        while True:
            next_value = loop.create_task(iteration_pass())
            await next_value
    

    This version ensures that the next_value is assigned to as soon as the previous one has been awaited. To do so, it uses a helper iteration_pass coroutine which serves as a convenient task to put into next_value before fn() is actually ready to run. With that in place, main() can look like this:

    async def main():
        asyncio.create_task(set_interval(x, 1))
        await asyncio.sleep(0)
        while True:
            obj = await next_value
            print('got', obj)
    

    main() is no longer busy-looping and has the expected output of exactly one timestamp per second. However, we still need the initial asyncio.sleep(0) because next_value is simply not available when we just call create_task(set_interval(...)). This is because a task scheduled with create_task is only run after returning to the event loop. Omitting the sleep(0) would result in an error along the lines of "objects of type NoneType cannot be awaited".

    To resolve this, set_interval can be split into a regular def which schedules the initial loop iteration and immediately initializes next_value. The function instantiates and immediately returns a coroutine object that does the rest of the work.

    next_value = None
    
    def set_interval(fn, seconds):
        global next_value
        loop = asyncio.get_event_loop()
    
        next_value = loop.create_task(fn())
    
        async def iteration_pass():
            await asyncio.sleep(seconds)
            return await fn()
    
        async def interval_loop():
            global next_value
            while True:
                next_value = loop.create_task(iteration_pass())
                await next_value
    
        return interval_loop()
    

    Now main() can be written in the obvious way:

    async def main():
        asyncio.create_task(set_interval(x, 1))
        while True:
            obj = await next_value
            print('got', obj)
    

    Compared to async iteration, the advantage of this approach is that multiple listeners can observe the values.