Search code examples
pythontornado

Implementing and testing WebSocket server connection timeout


I am implementing a WebSockets server in Tornado 3.2. The client connecting to the server won't be a browser.

For cases in which there is back-and-forth communication between server and client, I would like to add a max. time the server will wait for a client response before closing the connection.

This is roughly what I've been trying:

import datetime
import tornado

class WSHandler(WebSocketHandler):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = None

    def _close_on_timeout(self):
        if self.ws_connection:
            self.close()

    def open(self):
        initialize()

    def on_message(self, message):
        # Remove previous timeout, if one exists.
        if self.timeout:
            tornado.ioloop.IOLoop.instance().remove_timeout(self.timeout)
            self.timeout = None

        if is_last_message:
            self.write_message(message)
            self.close()
        else:
            # Add a new timeout.
            self.timeout = tornado.ioloop.IOLoop.instance().add_timeout(
                datetime.timedelta(milliseconds=1000), self._close_on_timeout)
            self.write_message(message)

Am I being a klutz and is there a much simpler way of doing this? I can't even seem to schedule a simple print statement via add_timeout above.

I also need some help testing this. This is what I have so far:

from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time

class WSTests(AsyncHTTPTestCase):

    @gen_test
    def test_long_response(self):
        ws = yield websocket_connect('ws://address', io_loop=self.io_loop)

        # First round trip.
        ws.write_message('First message.')
        result = yield ws.read_message()
        self.assertEqual(result, 'First response.')

        # Wait longer than the timeout.
        # The test is in its own IOLoop, so a blocking sleep should be okay?
        time.sleep(1.1)

        # Expect either write or read to fail because of a closed socket.
        ws.write_message('Second message.')
        result = yield ws.read_message()

        self.assertNotEqual(result, 'Second response.')

The client has no problem writing to and reading from the socket. This is presumably because the add_timeout isn't firing.

Does the test need to yield somehow to allow the timeout callback on the server to run? I would have thought not since the docs say the tests run in their own IOLoop.

Edit

This is the working version, per Ben's suggestions.

import datetime
import tornado

class WSHandler(WebSocketHandler):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = None

    def _close_on_timeout(self):
        if self.ws_connection:
            self.close()

    def open(self):
        initialize()

    def on_message(self, message):
        # Remove previous timeout, if one exists.
        if self.timeout:
            tornado.ioloop.IOLoop.current().remove_timeout(self.timeout)
            self.timeout = None

        if is_last_message:
            self.write_message(message)
            self.close()
        else:
            # Add a new timeout.
            self.timeout = tornado.ioloop.IOLoop.current().add_timeout(
                datetime.timedelta(milliseconds=1000), self._close_on_timeout)
            self.write_message(message)

The test:

from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time

class WSTests(AsyncHTTPTestCase):

    @gen_test
    def test_long_response(self):
        ws = yield websocket_connect('ws://address', io_loop=self.io_loop)

        # First round trip.
        ws.write_message('First message.')
        result = yield ws.read_message()
        self.assertEqual(result, 'First response.')

        # Wait a little more than the timeout.
        yield gen.Task(self.io_loop.add_timeout, datetime.timedelta(seconds=1.1))

        # Expect either write or read to fail because of a closed socket.
        ws.write_message('Second message.')
        result = yield ws.read_message()
        self.assertEqual(result, None)

Solution

  • The timeout-handling code in your first example looks correct to me.

    For testing, each test case gets its own IOLoop, but there is only one IOLoop for both the test and anything else it runs, so you must use add_timeout instead of time.sleep() here as well to avoid blocking the server.