I have a websocket server application which sends messages to each websocket clients using Tornado PeriodicCallback
.
ioloop.PeriodicCallback(dispatch, 10).start()
ioloop.IOLoop.instance().start()
The dispatch()
function has a loop to consume RabbitMQ messages then forward them to each websocket clients.
def dispatch():
global channel, queue
time_start = time.time()
while True:
try:
method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
message = json.loads(body.decode('utf-8'))
if 'websocket_uri' in message:
websocket_uri = message['websocket_uri']
uri = urlparse(websocket_uri)
path = uri.path
else:
path = ''
if 'payload' in message:
payload = json.dumps(message['payload'])
else:
payload = ''
for client in clients:
if client.path == path:
client.write_message(payload)
logger.info('WRITE: %s: %s' % (client.path, payload))
channel.basic_ack(method_frame.delivery_tag)
except Exception as e:
logger.exception(str(e))
channel.basic_nack(method_frame.delivery_tag)
finally:
time_end = time.time()
if time_end - time_start > 1:
break;
return
Somehow, when i use a larger callback_time value like 100ms or 200ms, not all messages forwarded to websocket clients. But when i use smaller value like 10ms or 1ms, the function works.
How the PeriodicCallback
actually works? How to make sure the dispatch()
function always called by Tornado?
Thanks
I found the solution. I replaced the PeriodicCallback
with add_callback
:
app.listen(9001)
mainloop = ioloop.IOLoop.current()
mainloop.add_callback(dispatch)
mainloop.start()
Then use add_callback
at the end of dispatch()
function so the dispatch()
function will be called on the next I/O iteration.
def dispatch():
global channel, queue
while True:
try:
method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
message = json.loads(body.decode('utf-8'))
if 'websocket_uri' in message:
websocket_uri = message['websocket_uri']
uri = urlparse(websocket_uri)
path = uri.path
else:
path = ''
if 'payload' in message:
payload = json.dumps(message['payload'])
logger.info('Payload: %s' % payload)
else:
payload = ''
for client in clients:
logger.info('Path: %s' % client.path)
if client.path == path:
client.write_message(payload)
logger.info('WRITE: %s: %s' % (client.path, payload))
channel.basic_ack(method_frame.delivery_tag)
else:
break;
except Exception as e:
logger.exception(str(e))
channel.basic_nack(method_frame.delivery_tag)
ioloop.IOLoop.current().add_callback(dispatch)
return