Search code examples
pythonpython-asyncio

Select first result from two coroutines in asyncio


Question

Using Python's asyncio module, how do I select the first result from multiple coroutines?

Example

I might want to implement a timeout on waiting on a queue:

result = yield from select(asyncio.sleep(1),
                           queue.get())

Analagous Operations

This would be similar to Go's select or Clojure's core.async.alt!. It is something like the converse of asyncio.gather (gather is like all, select would be like any.)


Solution

  • You can implement this using both asyncio.wait and asyncio.as_completed:

    import asyncio
    
    
    async def ok():
        await asyncio.sleep(1)
        return 5
    
    async def select1(*futures, loop=None):
        if loop is None:
            loop = asyncio.get_event_loop()
        return (await next(asyncio.as_completed(futures)))
    
    async def select2(*futures, loop=None):
        if loop is None:
            loop = asyncio.get_event_loop()
        done, running = await asyncio.wait(futures,
                                                return_when=asyncio.FIRST_COMPLETED)
        result = done.pop()
        return result.result()
    
    async def example():
        queue = asyncio.Queue()
        result = await select1(ok(), queue.get())
        print('got {}'.format(result))
        result = await select2(queue.get(), ok())
        print('got {}'.format(result))
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(example())
    

    Output:

    got 5
    got 5
    Task was destroyed but it is pending!
    task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:463]>
    Task was destroyed but it is pending!
    task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]>>
    

    Both implementations return the value awaited from first completed Future, but you can easily tweak it to return the Future itself, instead. Note that because the other Future passed to each select implementation is never awaited, a warning gets raised when the process exits.