Search code examples
pythonpython-asyncio

Async [TCP] writer close to avoid resource leaks


I'm implementing a TCP client with asyncio Streams. Typical example code is:

reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)
...
writer.close()
await writer.wait_closed()

The ... is a non-trivial piece of async/await code in my case. To prevent resource leaks (e.g. fds), I believe I need to call that close() function, so I really should put it inside a try/finally. Is this correct or Python somehow magically handles resource cleanup, like it finalizes asynchronous generators, when async loop ends?

If not and manual cleanup is required, is there a more canonical/Pythonic way to implement this than defining a new function with @contextlib.asynccontextmanager?

I tried contextlib.closing() and contextlib.aclosing() but that doesn't work, since asyncio.open_connection() returns tuple and writer doesn't have aclose(), just close().


Solution

  • You always have to close streams when done.

    You can use try/finally

    async def main_try_catch():
        writer: Optional[StreamWriter] = None
        try:
            print("Opening")
            reader, writer = await asyncio.open_connection("www.google.com", 443, ssl=True)
            print("Opened")
            data = await reader.readline()
            print(f"Received: {data.decode()!r}")
        except:
            print("Caught exception")
        finally:
            if writer is not None:
                print("Closing")
                writer.close()
                await writer.wait_closed()
                print("Closed")
    

    Or you can use @contextlib.asynccontextmanager:

    @contextlib.asynccontextmanager
    async def main_context_manager() -> Tuple[StreamReader, StreamWriter]:
        #  __aenter__
        print("Opening")
        reader, writer = await asyncio.open_connection("www.google.com", 443, ssl=True)
    
        yield reader, writer
    
        #  __aexit__
        if writer is not None:
            print("Closing")
            writer.close()
            await writer.wait_closed()
            print("Closed")
    

    And use it like:

    import asyncio
    import contextlib
    from asyncio import StreamReader, StreamWriter
    from typing import Optional, Tuple
    
    async def main():
        # await main_try_catch()
    
        # OR
        try:
            async with main_context_manager() as (reader, writer):
                print("Opened")
                data = await reader.readline()
                print(f"Received: {data.decode()!r}")
        except:
            print("Caught exception")
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    
    # SUCCESS CASE
    # Opening
    # Opened
    # Received: '...'
    # Closing
    # Closed
    
    # ERROR CASE
    # Opening
    # Caught exception