Search code examples
pythonasynchronousasync-awaitpython-trio

Python: await to read from socket OR shut down on event


Given this function:

async def read_data(self, stream: UStreams) -> None:
    while True:
        read_bytes = await stream.read(MAX)
        #handle the bytes  

This however will keep the function running forever, of course.

I'd like to have this function do this, but also shutdown (therefore exit the function) in case an event occurs:

async def read_data(self, stream: UStreams, evt: trio.Event) -> None:
    while True:
        read_bytes = await stream.read(MAX) || await evt.wait() # <- how do I do this?
        #handle the bytes or exit

I can't simply put evt.wait() as first statement in the loop, as the stream would never be read (and the other way around also doesn't work, as then the event would never be awaited if there is no stuff to read). gather type of code also doesn't help, I don't want to wait for both: only to the first one occurring (in go this would be solved with a select statement).


Solution

  • You can use one of the waiting primitives, such as asyncio.wait, or asyncio.as_completed.

    import asyncio
    
    async def select(*awaitables):
        return await next(asyncio.as_completed(awaitables))
    

    Example:

    async def fast():
        await asyncio.sleep(0.2)
        return 1
    
    async def slow():
       await asyncio.sleep(1)
       return 2
    
    async def main():
        result = await select(fast(), slow())
        print(result)
    
    asyncio.run(main())
    # 1
    

    In trio, you can also accomplish this with a supervisor. From the docs:

    here’s a function that takes a list of functions, runs them all concurrently, and returns the result from the one that finishes first:

    async def race(*async_fns):
        if not async_fns:
            raise ValueError("must pass at least one argument")
    
        winner = None
    
        async def jockey(async_fn, cancel_scope):
            nonlocal winner
            winner = await async_fn()
            cancel_scope.cancel()
    
        async with trio.open_nursery() as nursery:
            for async_fn in async_fns:
                nursery.start_soon(jockey, async_fn, nursery.cancel_scope)
    
        return winner
    

    This works by starting a set of tasks which each try to run their function. As soon as the first function completes its execution, the task will set the nonlocal variable winner from the outer scope to the result of the function, and cancel the other tasks using the passed in cancel scope. Once all tasks have been cancelled (which exits the nursery block), the variable winner will be returned.