Search code examples
pythontcppython-asyncio

How do you detect when an asyncio TCP connection is gone


I apologize if this is a repeat question: I have looked and haven't found any that would satisfy my question. I have a python script that allows my computer to connect to a piece of hardware using a static IP address and port. This piece of hardware only allows one connection at a time on this port. My first issue is that asyncio.open_connection() returns a successful connection status even if there is already another "user" connected to the device. When a true connection happens, the hardware sends a connection status message which, in my case, I do not receive until after the other "user" disconnects. While annoying, I can work around this issue by waiting for the status update message after "connecting" before allowing my script to proceed.

My bigger issue is that I do not have a way of knowing when my physical connection has been removed. For instance, I am connected to the hardware using a USB connection. The hardware requires that I send a keep alive message every 5 seconds but it does not send a response to the keep alive messages. If I pull the USB cable out of the device I would expect to receive errors when writing the keep alive message but I do not.

My script involves multiple concurrent asyncio tasks, but this simplified example should suffice. I would expect to receive an error when calling self.writer.write() or self.writer.drain() after I yank out the USB cable but I receive no indication of any change in the connection. My code just eats it and continues to send keep alive messages. What am I missing?

import asyncio
import logging
from typing import TypeVar

logger = logging.getLogger(__name__)
host = '169.254.13.95'
port = 51717
timeout_sec = 10
lock = asyncio.Lock()

# if using 3.11 or greater this line is not needed
Self = TypeVar("Self", bound="Foo")


class TcpConnection:
    """A sample TCP connection class to demonstrate my point"""

    def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
        self.reader: asyncio.StreamReader = reader
        self.writer: asyncio.StreamWriter = writer

    @classmethod
    async def connect(cls, host: str, port: int) -> Self | None:
        connection = None
        logger.info(f'Connecting to {host}:{port}')
        try:
            reader, writer = await asyncio.open_connection(host=host, port=port)
            logger.info('Connected')
            connection = TcpConnection(reader, writer)
        except ConnectionRefusedError:
            logger.info(f'Connect call refused ({host}:{port})')
        except OSError:
            logger.info(f'Connect call failed ({host}:{port})')
        except Exception as e:
            logger.warning(f'Unknown exception caught:\n{e}')
        finally:
            return connection

    def is_connected(self) -> bool:
        return self.writer.is_closing() == False

    async def keep_alive(self) -> None:
        logger.info('Starting keep alive task')
        keep_alive_msg = b'\x00'
        while self.is_connected():
            async with lock:
                self.writer.write(keep_alive_msg)
                await self.writer.drain()
                logger.debug('Sent keep alive message')
            await asyncio.sleep(4.5)  # don't wait the full 5 seconds just in case
        logger.info('Terminating keep alive task')


async def main() -> None:
    while 1:
        tcp = await TcpConnection.connect(host, port)

        if tcp and tcp.is_connected():
            try:
                # create a task to run the keep alive message
                keep_alive_task = asyncio.create_task(tcp.keep_alive())
                await keep_alive_task
            except ConnectionError:
                logger.info('Client disconnected')
        logger.info(f'Waiting {timeout_sec} seconds before trying to reconnect')
        await asyncio.sleep(timeout_sec)


if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        level=logging.DEBUG)
    try:
        logger.info('Starting application')
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info('Exiting application')

Solution

  • My first issue is that asyncio.open_connection() returns a successful connection status even if there is already another "user" connected to the device.

    Establishing a connection is done inside the OS kernel and the kernel can do this for many connections in parallel, even if the user space application handles only one connection at a time. There is no way around it.

    The hardware requires that I send a keep alive message every 5 seconds but it does not send a response to the keep alive messages.

    TCP is about reliability. It will try to retransmit the data and this retransmission attempts will only time out after a while. It will not immediately react to a broken link since it might not even notice or hope that the link gets re-established in time so that the data can get successfully retransmitted.

    If you want immediate notice then the peer would need to send some feedback that it received your data and you could react if you don't get this feedback. But this is not how keep alive seems to be designed in your case - it is just about keeping the connection alive (i.e. no state closing in firewalls because of idle connections) and not about immediately detecting broken links.

    would expect to receive an error when calling self.writer.write()

    Write just delivers the data to the local socket buffer. It can thus not provide any information if something went wrong when delivering the data. It will return an error if the socket was marked as broken when resubmissions of the previous data has ultimately failed, but this will take some time after the original data got written to the socket.