Using Python's asyncio
module, how do I select the first result from multiple coroutines?
I might want to implement a timeout on waiting on a queue:
result = yield from select(asyncio.sleep(1),
queue.get())
This would be similar to Go's select
or Clojure's core.async.alt!
. It is something like the converse of asyncio.gather
(gather is like all
, select would be like any
.)
You can implement this using both asyncio.wait
and asyncio.as_completed
:
import asyncio
async def ok():
await asyncio.sleep(1)
return 5
async def select1(*futures, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
return (await next(asyncio.as_completed(futures)))
async def select2(*futures, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
done, running = await asyncio.wait(futures,
return_when=asyncio.FIRST_COMPLETED)
result = done.pop()
return result.result()
async def example():
queue = asyncio.Queue()
result = await select1(ok(), queue.get())
print('got {}'.format(result))
result = await select2(queue.get(), ok())
print('got {}'.format(result))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(example())
Output:
got 5
got 5
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:463]>
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]>>
Both implementations return the value awaited from first completed Future
, but you can easily tweak it to return the Future
itself, instead. Note that because the other Future
passed to each select
implementation is never awaited, a warning gets raised when the process exits.