Search code examples
pythontcpstreampython-asyncio

Asyncio Streams automatically restarts to read data even after writer.close() is run


I followed the tutorial in https://docs.python.org/3/library/asyncio-stream.html to read data from a sensor (TCP protocol, PC as the server):

import datetime
import asyncio
import socket

async def connect_to_sq():
      _ip = "0.0.0.0"
      _port = 45454

      # Create a TCP server (socket type: SOCK_STREAM)
      server = await asyncio.start_server(handle_server, _ip, _port,family=socket.AF_INET, reuse_address=True)
      async with server: 
          await server.serve_forever()  

async def handle_server(reader, writer):
      # read data
      print("start reading")
      init_time = datetime.datetime.now()
      end_time = init_time + datetime.timedelta(seconds=1)
      while datetime.datetime.now() < end_time:
          raw_data_stream = await reader.readexactly(200)
          print("time:", datetime.datetime.now()-init_time)

      # close connection
      writer.close()
      await writer.wait_closed()
      print("Connection closed")

if __name__ == "__main__":
    asyncio.run(connect_to_sq())

The program is supposed to finish after 1 second data transmission. However, the output is:

start reading
time: 0:00:00.495863
time: 0:00:00.594812
time: 0:00:00.695760
time: 0:00:00.794883
time: 0:00:00.895336
time: 0:00:00.995024
time: 0:00:01.095308
Connection closed
start reading
time: 0:00:00.647908
time: 0:00:00.750355
time: 0:00:00.848436
......

It repeated automatically and infinitely. What is the reason for this and How could I solve it?


Solution

  • Problem

    serve_forever() listens to the port indefinitely. So, even though the individual connections close after 1 second, the server keeps accepting new connections. In this case, your sensor (client) seems to be creating a new connection after an old connection is closed, and since the server still accepts them, handle_server runs again from the top.

    Solution

    Maybe not the best way™, but one possible solution is to use a Future so that handle_server can signal the main code upon connection completion. The main code can then stop the server, avoiding new connections. This is how it can be done:

    import datetime
    import asyncio
    import socket
    from functools import partial
    
    async def connect_to_sq():
          _ip = "0.0.0.0"
          _port = 45454
    
          # Create a TCP server (socket type: SOCK_STREAM)
          first_connection_completion = asyncio.Future()
          server = await asyncio.start_server(
            partial(handle_server, first_connection_completion=first_connection_completion),
            _ip,
            _port,family=socket.AF_INET,
            reuse_address=True)
          async with server: 
              server_task = asyncio.create_task(server.serve_forever())
              await first_connection_completion
              server_task.cancel()
    
    async def handle_server(reader, writer, first_connection_completion=None):
          # read data
          print("start reading")
          init_time = datetime.datetime.now()
          end_time = init_time + datetime.timedelta(seconds=1)
          while datetime.datetime.now() < end_time:
              raw_data_stream = await reader.readexactly(200)
              print("time:", datetime.datetime.now()-init_time)
    
          # close connection
          writer.close()
          await writer.wait_closed()
          print("Connection closed")
          first_connection_completion.set_result(None)
    
    if __name__ == "__main__":
        asyncio.run(connect_to_sq())
    
    

    Couple of notes:

    • handle_server now takes one extra argument, first_connection_completion which is the future used to send signal from the function to the main code. The argument has been binded to the function using functools.partial. Within the function, set_result has been used to mark the future as completed.

    • serve_forever() is now wrapped by a create_task call. This is because we can't await on it.

    The new code looks messy, but you can always refactor. So a cleaner version would be:

    import datetime
    import asyncio
    import socket
    
    async def listen_once(handler, *server_args, **server_kwargs):
        first_connection_completion = asyncio.Future()
        async def wrapped_handler(*args):
            await handler(*args)
            first_connection_completion.set_result(None)
        server = await asyncio.start_server(wrapped_handler, *server_args, **server_kwargs)
        async with server:
            server_task = asyncio.create_task(server.serve_forever())
            await first_connection_completion
            server_task.cancel()
    
    async def connect_to_sq():
          _ip = "0.0.0.0"
          _port = 45454
    
          # Create a TCP server (socket type: SOCK_STREAM)
          await listen_once(handle_server, _ip, _port, family=socket.AF_INET, reuse_address=True)
    
    async def handle_server(reader, writer):
          # read data
          print("start reading")
          init_time = datetime.datetime.now()
          end_time = init_time + datetime.timedelta(seconds=1)
          while datetime.datetime.now() < end_time:
              raw_data_stream = await reader.readexactly(200)
              print("time:", datetime.datetime.now()-init_time)
    
          # close connection
          writer.close()
          await writer.wait_closed()
          print("Connection closed")
    
    if __name__ == "__main__":
        asyncio.run(connect_to_sq())