Search code examples
pythonamazon-web-serviceswebsocketaws-appsync

mqtt+AWS AppSync: WebsocketConnectionError: WebSocket handshake error, connection not upgraded


I am using paho-mqtt Python library to implement subscriptions against AWS AppSync.

Here it is the code for the subscription:


        postHeaders = {
            'Content-Type': 'application/json',
            **self._headers
        }

        payload = {
            "operationName": "onCommand",
            "query": """subscription onCommand($deviceId: ID!) {
                  onCommandCreated(deviceId: $deviceId) {
                    commandId
                    deviceId
                    command
                    arguments
                    status
                  }
                }
            """,
            "variables": {"deviceId": self._device_id}
        }

        r = requests.post(_ROOT_URL, headers=postHeaders, json=payload)
        try:
            r.raise_for_status()
        except Exception:
            self._exception('Could not subscribe to device commands, status %s, response %s', r.status_code, r.content)
            raise
        data = r.json()
        self._logger.info('Subscription response: %s', json.dumps(data, indent=3, sort_keys=True))

        # if data.get('errors'):
        #     raise ValueError('Error subscribing: {}'.format(data))
        client_id = data['extensions']['subscription']['mqttConnections'][0]['client']
        ws_url = r.json()['extensions']['subscription']['mqttConnections'][0]['url']
        topic = r.json()['extensions']['subscription']['mqttConnections'][0]['topics'][0]

        # noinspection PyUnusedLocal
        def on_connect(_client, _userdata, flags, rc):
            self._debug('Connected to subscription topic')
            client.subscribe(topic)
        urlparts = urlparse(ws_url)

        headers = {
            "Host": "{0:s}".format(urlparts.netloc),
        }

        client = mqtt.Client(client_id=client_id, transport="websockets")
        client.enable_logger(logging.getLogger('paho-mqtt'))
        client.on_connect = on_connect
        client.on_message = ...
        client.on_log = lambda *a, **kw: self._logger.info('On log', exc_info=True)
        client.ws_set_options(path="{}?{}".format(urlparts.path, urlparts.query), headers=headers)
        client.tls_set()

        self._debug('trying to connect now....')
        client.connect(urlparts.netloc, 443)
        try:
            client.loop_forever()
        except Exception:
            self._logger.exception('Error looping forever')
            raise
        finally:
            client.disconnect()

It works pretty well for about 24 hours, after which it starts failing, even despite its attempts to reconnect. I managed to intercept the exception that causes the failure:

May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]:   File "/home/ubuntu/chattyraspi/lib/python3.6/site-packages/paho/mqtt/clie
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]:     self.reconnect()
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]:   File "/home/ubuntu/chattyraspi/lib/python3.6/site-packages/paho/mqtt/clie
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]:     self._websocket_path, self._websocket_extra_headers)
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]:   File "/home/ubuntu/chattyraspi/lib/python3.6/site-packages/paho/mqtt/clie
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]:     self._do_handshake(extra_headers)
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]:   File "/home/ubuntu/chattyraspi/lib/python3.6/site-packages/paho/mqtt/clie
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]:     "WebSocket handshake error, connection not upgraded")
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]: paho.mqtt.client.WebsocketConnectionError: WebSocket handshake error, conne
May 01 20:39:21 ip-10-0-1-121 test_chattyraspi[8022]: 2020-05-01 20:39:21,194 - paho-mqtt - DEBUG - Connection failed, retrying
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]: 2020-05-01 20:39:23,207 - client.raspi.alexa.mirko.io - INFO - On log
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]: Traceback (most recent call last):
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]:   File "/home/ubuntu/chattyraspi/lib/python3.6/site-packages/paho/mqtt/clie
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]:     self.reconnect()
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]:   File "/home/ubuntu/chattyraspi/lib/python3.6/site-packages/paho/mqtt/clie
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]:     self._websocket_path, self._websocket_extra_headers)
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]:   File "/home/ubuntu/chattyraspi/lib/python3.6/site-packages/paho/mqtt/clie
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]:     self._do_handshake(extra_headers)
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]:   File "/home/ubuntu/chattyraspi/lib/python3.6/site-packages/paho/mqtt/clie
May 01 20:39:23 ip-10-0-1-121 test_chattyraspi[8022]:     "WebSocket handshake error, connection not upgraded")

If I reboot the script, it starts back receiving notifications immediately.

Here you can find the complete code.

I have added some more logs to dig into the issue. It seems to be that AWS AppSync returns a connection: keep-alive, which causes the crash.


Solution

  • IoT disconnects WebSocket connection and requires authentication every 24 hours. You will need to redo Subscribe(basically the authentication and connect to the pre-signed URL) every 24 hours.