Search code examples
pythonasynchronousiopython-asyncio

Python: Reading long lines with asyncio.StreamReader.readline()


The asyncio version of readline() (1) allows reading a single line from a stream asynchronously. However, if it encounters a line that is longer than a limit, it will raise an exception (2). It is unclear how to resume reading after such an exception is raised.

I would like a function similar to readline(), that simply discards parts of lines that exceed the limit, and continues reading until the stream ends. Does such a method exist? If not, how to write one?

1: https://docs.python.org/3/library/asyncio-stream.html#streamreader

2: https://github.com/python/cpython/blob/3.12/Lib/asyncio/streams.py#L549


Solution

  • The standard library implementation of asyncio.StreamReader.readline is almost what you want. That function raises a ValueError in the event of a buffer overrun, but it first removes a chunk of data from the buffer and discards it. If you catch the ValueError and keep calling readline, the stream would keep going. But you would lose the first chunk of data, which is not what you want.

    To achieve what you want you can call a slightly lower-level function StreamReader.readuntil(separator). If you call reader.readuntil('\n') it's equivalent to reader.readline. On an overrun this function raises a asyncio.LimitOverrunError, which has an instance variable named consumed. This is an integer representing the current length of the buffer contents.

    There are a couple of strange details here. Despite the variable's name, these bytes have not been "consumed" - they are still in the buffer. In order to keep the stream going you have to consume them yourself, which you can do by calling the function StreamReader.readexactly. The second strange thing is that the value of LimitOverrunError.consumed can be greater than the value of the keyword argument limit= that you originally passed to the open_connection constructor. That argument was supposed to set the size of the input buffer but it appears that it merely sets the threshold for raising the LimitOverrunError. Consequently, the buffer might contain the separator already, but fortunately the character count in LimitOverrunError.consumed does not include the separator. So a call to readexactly will not consume the separator.

    You can achieve exactly what you want by calling readuntil, with added logic to handle LimitOverrunError. In the exception handler you save the first chunk of data and continue to read from the stream. When you finally encounter the end of the line you return the first chunk that you saved.

    Here is a full working demo program. To test the code I set limit=512 in asyncio.open_connection. The server transmits a line that's longer than this followed by some shorter lines. The client reads each of the lines without any exception being raised.

    The function you are asking for is readline_no_limit.

    import asyncio
    
    _PORT = 54242
    
    MY_LIMIT = 512
    
    async def readline_no_limit(reader):
        """ Return a bytes object.  If there has not been a buffer
        overrun the returned value will end with include the line terminator,
        otherwise not.
    
        The length of the returned value may be greater than the limit
        specified in the original call to open_connection."""
    
        discard = False
        first_chunk = b''
        while True:
            try:
                chunk = await reader.readuntil(b'\n') 
                if not discard:
                    return chunk
                break
            except asyncio.LimitOverrunError as e:
                print(f"Overrun detected, buffer length now={e.consumed}")
                chunk = await reader.readexactly(e.consumed)
                if not discard:
                    first_chunk = chunk
                discard = True
        return first_chunk
    
    async def client():
        await asyncio.sleep(1.0)
        reader, writer = await asyncio.open_connection(host='localhost', 
            port=_PORT, limit=MY_LIMIT)
        writer.write(b"Hello\n")
        while True:
            line = await readline_no_limit(reader)
            print(f"Received {len(line)} bytes, first 40: {line[:40]}")
            if line == b"End\n":
                break
        writer.write(b"Quit\n")
    
    async def got_cnx(reader, writer):
        while True:
            msg = await reader.readline()
            if msg == b"Quit\n":
                break
            if msg != b"Hello\n":
                continue
            long_line = ' '.join([format(x, "x") for x in range(1000)])
            writer.write(bytes(long_line, encoding='utf8'))
            writer.write(b"\n")
            writer.write(b"Short line\n")
            writer.write(b"End\n")
    
    async def main():
        def shutdown(_future):
            print("\nClient quit, server shutdown")
            server.close()
        client_task = asyncio.create_task(client())
        server = await asyncio.start_server(got_cnx, host='localhost', port=_PORT,
            start_serving = False)
        client_task.add_done_callback(shutdown)
        try:
            await server.serve_forever()
        except asyncio.CancelledError:
            print("Server closed normally")
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    Output:

    Overrun detected, buffer length now=3727
    Received 3727 bytes, first 40: b'0 1 2 3 4 5 6 7 8 9 a b c d e f 10 11 12'
    Received 11 bytes, first 40: b'Short line\n'
    Received 4 bytes, first 40: b'End\n'
    
    Client quit, server shutdown
    Server closed normally
    

    Python 3.11, Ubuntu