I'm using this code to create an http request every 5 seconds.
async def do_request():
async with aiohttp.ClientSession() as session:
async with session.get('http://localhost:8000/') as resp:
print(resp.status)
return await resp.text()
Didn't find a bulit-in scheduler, so I wrote this function (similar to javascript):
async def set_interval(fn, seconds):
while True:
await fn()
await asyncio.sleep(seconds)
And this is how I use it:
asyncio.ensure_future(set_interval(do_request, 5))
The code works fine, but I have two requirements:
1. How can I stop the set_interval after it is added to the event loop? (similar to javascript clearInterval()
)
2. Is it possible that set_interval will return the value of the function that it is wrapping? In this example I will want the response text. Other pattern that will do the same task will be accepted.
- How can I stop the set_interval after it is added to the event loop? (similar to javascript
clearInterval()
)
One option is to cancel the returned task:
# create_task is like ensure_future, but guarantees to return a task
task = loop.create_task(set_interval(do_request, 5))
...
task.cancel()
This will also cancel whatever future set_interval
was awaiting. If you don't want that, and want fn()
to continue in the background instead, use await asyncio.shield(fn())
instead.
- Is it possible that
set_interval
will return the value of the function that it is wrapping? In this example I will want the response text. Other pattern that will do the same task will be accepted.
Since set_interval
is running in an infinite loop, it cannot return anything - a return would terminate the loop.
If you need the values from the function, one option is to redesign set_interval
as a generator. The caller would obtain the values using async for
, which is quite readable, but also very different from JavaScript's setInterval
:
async def iter_interval(fn, seconds):
while True:
yield await fn()
await asyncio.sleep(seconds)
Usage example:
async def x():
print('x')
return time.time()
async def main():
async for obj in iter_interval(x, 1):
print('got', obj)
# or asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
Another approach is for each pass of the loop to broadcast the generated value to a global future which other coroutines can await; something along the lines of:
next_value = None
async def set_interval(fn, seconds):
global next_value
loop = asyncio.get_event_loop()
while True:
next_value = loop.create_task(fn())
await next_value
await asyncio.sleep(seconds)
With usage like this:
# def x() as above
async def main():
asyncio.create_task(set_interval(x, 1))
while True:
await asyncio.sleep(0)
obj = await next_value
print('got', obj)
The above simple implementation has a major issue, though: Once the next value is provided, next_value
is not immediately replaced by a new Future
, but only after the sleep. This means that main()
prints "got " in a tight loop, until a new timestamp arrives. It also means removing await asyncio.sleep(0)
would actually break it, because the only await
in the loop would never suspend and set_interval
would no longer get a chance to run.
This is clearly not intended. We would like the loop in main()
to wait for the next value, even after an initial value is obtained. To do so, set_interval
must be a bit smarter:
next_value = None
async def set_interval(fn, seconds):
global next_value
loop = asyncio.get_event_loop()
next_value = loop.create_task(fn())
await next_value
async def iteration_pass():
await asyncio.sleep(seconds)
return await fn()
while True:
next_value = loop.create_task(iteration_pass())
await next_value
This version ensures that the next_value
is assigned to as soon as the previous one has been awaited. To do so, it uses a helper iteration_pass
coroutine which serves as a convenient task to put into next_value
before fn()
is actually ready to run. With that in place, main()
can look like this:
async def main():
asyncio.create_task(set_interval(x, 1))
await asyncio.sleep(0)
while True:
obj = await next_value
print('got', obj)
main()
is no longer busy-looping and has the expected output of exactly one timestamp per second. However, we still need the initial asyncio.sleep(0)
because next_value
is simply not available when we just call create_task(set_interval(...))
. This is because a task scheduled with create_task
is only run after returning to the event loop. Omitting the sleep(0)
would result in an error along the lines of "objects of type NoneType
cannot be awaited".
To resolve this, set_interval
can be split into a regular def
which schedules the initial loop iteration and immediately initializes next_value
. The function instantiates and immediately returns a coroutine object that does the rest of the work.
next_value = None
def set_interval(fn, seconds):
global next_value
loop = asyncio.get_event_loop()
next_value = loop.create_task(fn())
async def iteration_pass():
await asyncio.sleep(seconds)
return await fn()
async def interval_loop():
global next_value
while True:
next_value = loop.create_task(iteration_pass())
await next_value
return interval_loop()
Now main()
can be written in the obvious way:
async def main():
asyncio.create_task(set_interval(x, 1))
while True:
obj = await next_value
print('got', obj)
Compared to async iteration, the advantage of this approach is that multiple listeners can observe the values.