Search code examples
pythonpython-3.xwebsockettornado

How to send a message from the server to the client in a web socket


Well I'm doing a little program that aims to receive messages on rabbitmq and send to clients connected to the websocket that has the same index, but having a problem when sending to the client, I reused a code that had to work in 3.6 .9 and I don't remember the version of the tornado (websocket library that I use) but I changed the pc and I'm able to install it again, now I have the newest versions of the library and python.

I'll post my old code because it's easier to understand because of the msm error

import tornado.websocket
import tornado.ioloop
import threading
import pika
import json

def verificar_novo(se):
    for i in range(0,len(conexao_lista)):
        if se == conexao_lista[i]["endereco"]:
            return 0
    return 1

def excluir_conexao(endereco):
    for i in range(0,len(conexao_lista)):
        if conexao_lista[i]["endereco"] == endereco:
            del(conexao_lista[i])
            break

""" Função para procurar mensagens no rabbit e retornar para os clientes"""
def callback(ch, method, properties, body):
    menssagem_rabbit = json.loads(body)
    threading.Lock()
    for i in range(0, len(conexao_lista)):
        if (conexao_lista[i]["configuracao"]["ras_eve_id_indice"]) == (menssagem_rabbit["ras_eve_id_indice"]):
            conexao_lista[i]["endereco"].write_message(menssagem_rabbit)
            break
    threading.RLock()



""" Classe de conexao com cliente"""
class WebSocketHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print("Novo cliente conectado")


    def on_close(self):
        print("Cliente desconectado")
        excluir_conexao(self)

    def on_message(self, message):

        n = verificar_novo(self)
        if n == 0:
            self.write_message(u"Sua menssagem: " + message)
        else:
            dados_json = json.loads(message)
            conexao_dicionario["endereco"] = self
            conexao_dicionario["configuracao"] = dados_json
            conexao_lista.append(conexao_dicionario.copy())
            self.write_message(u"Usuario conectado " + dados_json["id_usuario"])


    def check_origin(self, origin):
        return True

"""Função que a thread ficara rodando para consumir as mensagem em segundo plano"""
def procurar_mensagens():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.basic_consume(queue='testerenan', on_message_callback=callback, auto_ack=True)
    channel.start_consuming()

"""Variaveis"""
conexao_lista = []
conexao_dicionario = {"endereco": "","configuracao":""}


"""Chamando a Thread"""
threading.Thread(target=procurar_mensagens, args=()).start()

"""Conexão do WebSocket"""
application = tornado.web.Application([(r"/", WebSocketHandler),])


if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

The error that appears:

Exception in thread Thread-1: 
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run() 
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs) 
  File "/home/renan/Área de Trabalho/Projeto-WebSocket/Servidor.py", line 63, in procurar_mensagens
    channel.start_consuming()
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
    self._process_data_events(time_limit=None) 
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 834, in process_data_events
    self._dispatch_channel_events()
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 566, in _dispatch_channel_events
    impl_channel._get_cookie()._dispatch_events() 
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 1493, in _dispatch_events
    consumer_info.on_message_callback(self, evt.method, 
  File "/home/renan/Área de Trabalho/Projeto-WebSocket/Servidor.py", line 26, in callback
    conexao_lista[i]["endereco"].write_message(menssagem_rabbit) 
  File "/home/renan/.local/lib/python3.8/site-packages/tornado/websocket.py", line 342, in write_message
    return self.ws_connection.write_message(message, binary=binary)
  File "/home/renan/.local/lib/python3.8/site-packages/tornado/websocket.py", line 1098, in write_message
    fut = self._write_frame(True, opcode, message, flags=flags) 
  File "/home/renan/.local/lib/python3.8/site-packages/tornado/websocket.py", line 1075, in _write_frame
    return self.stream.write(frame)
  File "/home/renan/.local/lib/python3.8/site-packages/tornado/iostream.py", line 555, in write
    future = Future() # type: Future[None] 
  File "/usr/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.' 
RuntimeError: There is no current event loop in thread 'Thread-1'.

I'll leave the project for download if anyone wants to take a look: https://github.com/Renan-Sacca/Projeto-WebSocket


Solution

  • In general, you have to be very careful when mixing threads and Tornado - you can't call most tornado methods from other threads (this has always been true, but the library got more strict about enforcing it in Tornado 5.0). In particular this includes write_message. So in callback, instead of calling write_message, you have to ask the IOLoop to call it for you.

    In the main block, do global main_io_loop; main_io_loop = IOLoop.current() to save the main threads' IOLoop so you can refer to it later. Then in callback replace the call to write_message with

    main_io_loop.add_callback(conexao_lista[i]["endereco"].write_message, menssagem_rabbit)