Search code examples
python-3.xasynchronouswebsocketclienttornado

Tornado websocket client: how to async on_message? (coroutine was never awaited)


How can I make the on_message function work asynchronously in my Tornado WebSocketClient?

I guess I need to await the on_message function, but I don't know how.

Or is there even a fundamental misconception in the way how I try to implement an asynchronous WebSocketClient?

import tornado.websocket
from tornado.queues import Queue
from tornado import gen
import json


q = Queue()

class WebsocketClient():

    def __init__(self, url, connections):
        self.url = url
        self.connections = connections
        print("CLIENT started")
        print("CLIENT initial connections: ", len(self.connections))

    async def send_message(self):
        async for message in q:
            try:
                msg = json.loads(message)
                print(message)
                await gen.sleep(0.001)
            finally:
                q.task_done()

    async def update_connections(self, connections):
        self.connections = connections
        print("CLIENT updated connections: ", len(self.connections))

    async def on_message(self, message):
        await q.put(message)
        await gen.sleep(0.001)

    async def connect(self):
        client = await tornado.websocket.websocket_connect(url=self.url, on_message_callback=self.on_message)
RuntimeWarning: coroutine 'WebsocketClient.on_message' was never awaited
  self._on_message_callback(message)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Solution

  • on_message_callback is supposed to be a regular function, not a coroutine. And it is meant to be used in old-style code when people used callbacks instead of coroutines.

    For the newer async-style code, you don't need this callback. You can just do this:

    async def connect(self):
        client = await tornado.websocket.websocket_connect(url=self.url)
    
        while True:
            message = await client.read_message()
    
            if message is None:
                # None message means the connection was closed
                break
    
            print("Message received:", message)
            await q.put(message)
            await gen.sleep(0.001)