Search code examples
python-3.xtornado

Correct way of handling Tornado read_bytes method


I am working on the python app with a tornado, In which I want to listen data sent by the client continuously. Here is my code:

async def handle_stream(self, stream, address):
            try:
                while True:
                    data = stream.read_bytes(1024, callback = self._on_read, partial = True)
                    print(data)
                    stream.write(data)
            except StreamClosedError:
                logger.error("%s disconnected", address)

But I am facing following problems:

  • when I send data for the first time it invokes _on_read function but for the 2nd time, it does not process that data.
  • stream.write(data) gives the following error,

tornado.application:Exception in callback functools.partial(.null_wrapper at 0x7fd3ddf2ce18>, exception=AssertionError('Already reading',)>) Traceback (most recent call last): File "/venv/lib/python3.6/site-packages/tornado/ioloop.py", line 758, in _run_callback ret = callback() File "/venv/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper return fn(*args, **kwargs) File "/server.py", line 143, in lambda f: f.result()) File "/server.py", line 90, in handle_stream data = stream.read_bytes(1024, callback = self._on_read, partial = True) File "/venv/lib/python3.6/site-packages/tornado/iostream.py", line 432, in read_bytes future = self._set_read_callback(callback) File "/venv/lib/python3.6/site-packages/tornado/iostream.py", line 859, in _set_read_callback assert self._read_callback is None, "Already reading" AssertionError: Already reading


Solution

  • You try to get the data through two incompatible methods, through both the callback and the future. The error is not in stream.write, but when you call stream.read_bytes a second time with a callback.

    As callbacks are deprecated, the best is to use the future. That is, something like this :

    async def handle_stream(self, stream, address):
                try:
                    while True:
                        data = await stream.read_bytes(1024, partial = True)
                        print(data)
                        # stream.write(data)
                except StreamClosedError:
                    logger.error("%s disconnected", address)
    

    Also, the stream.write will write the data back to the same stream. Is it really what you want to do ?