Search code examples
pythonpython-3.xwebsocketrabbitmqtornado

Tornado PeriodicCallback does not works when using larger callback_time (ie: 200ms)


I have a websocket server application which sends messages to each websocket clients using Tornado PeriodicCallback.

ioloop.PeriodicCallback(dispatch, 10).start()
ioloop.IOLoop.instance().start()

The dispatch() function has a loop to consume RabbitMQ messages then forward them to each websocket clients.

def dispatch():

    global channel, queue
    time_start = time.time()
    while True:
        try:
            method_frame, header_frame, body = channel.basic_get(queue)
            if method_frame:
                message = json.loads(body.decode('utf-8'))
                if 'websocket_uri' in message:
                    websocket_uri = message['websocket_uri']
                    uri = urlparse(websocket_uri)
                    path = uri.path
                else:
                    path = ''
                if 'payload' in message:
                    payload = json.dumps(message['payload'])
                else:
                    payload = ''
                for client in clients:
                    if client.path == path:
                        client.write_message(payload)
                        logger.info('WRITE: %s: %s' % (client.path, payload))
                channel.basic_ack(method_frame.delivery_tag)
        except Exception as e:
            logger.exception(str(e))
            channel.basic_nack(method_frame.delivery_tag)
        finally:
            time_end = time.time()

        if time_end - time_start > 1:
            break;

    return

Somehow, when i use a larger callback_time value like 100ms or 200ms, not all messages forwarded to websocket clients. But when i use smaller value like 10ms or 1ms, the function works.

How the PeriodicCallback actually works? How to make sure the dispatch() function always called by Tornado?

Thanks


Solution

  • I found the solution. I replaced the PeriodicCallback with add_callback:

    app.listen(9001)
    mainloop = ioloop.IOLoop.current()
    mainloop.add_callback(dispatch)
    mainloop.start()
    

    Then use add_callback at the end of dispatch() function so the dispatch() function will be called on the next I/O iteration.

    def dispatch():
    
        global channel, queue
        while True:
            try:
                method_frame, header_frame, body = channel.basic_get(queue)
                if method_frame:
                    message = json.loads(body.decode('utf-8'))
                    if 'websocket_uri' in message:
                        websocket_uri = message['websocket_uri']
                        uri = urlparse(websocket_uri)
                        path = uri.path
                    else:
                        path = ''
                    if 'payload' in message:
                        payload = json.dumps(message['payload'])
                        logger.info('Payload: %s' % payload)
                    else:
                        payload = ''
                    for client in clients:
                        logger.info('Path: %s' % client.path)
                        if client.path == path:
                            client.write_message(payload)
                            logger.info('WRITE: %s: %s' % (client.path, payload))
                    channel.basic_ack(method_frame.delivery_tag)
                else:
                    break;
            except Exception as e:
                logger.exception(str(e))
                channel.basic_nack(method_frame.delivery_tag)
    
        ioloop.IOLoop.current().add_callback(dispatch)
        return