Search code examples
multithreadingwebsockettornadorosros2

How to send ros2 messages from a websocket server to connected clients in tornado


I have a ros2 publisher script that sends custom messages from ros2 nodes. What I need to do is to have a subscriber (which is also my websocket server) to listen to the message that the pulisher sends then convert it to a dictionary and send it as a json from the websocket server to a connected websocket client. I have already checked the rosbridge repo but I could not make it work. It doesn't have enough documentation and I am new to ros. I need something like this:

import rclpy
import sys
from rclpy.node import Node
import tornado.ioloop

import tornado.httpserver
import tornado.web
import threading

from custom.msg import CustomMsg


from .convert import message_to_ordereddict





wss = []
class wsHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print 'Online'
        if self not in wss:
            wss.append(self)

    def on_close(self):
        print 'Offline'
        if self in wss:
            wss.remove(self)

def wsSend(message):
    for ws in wss:
        ws.write_message(message)


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(CustomMsg, 'topic', self.CustomMsg_callback, 10)
        self.subscription  # prevent unused variable warning

    def CustomMsg_callback(self, msg):
        ws_message = message_to_ordereddict(msg)
        wsSend(ws_message)






if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(tornado.web.Application(wsHandler))
    http_server.listen(8888)
    main_loop = tornado.ioloop.IOLoop.instance()
    # Start main loop
    main_loop.start()

so the callback function in MinimalSubscriber class, receives the ros message, converts it to dictionary and sends it to websocket client. I am a bit confused how to make these two threads (ros and websocket) to communicate with each other.


Solution

  • So I think I got a bit confused myself going through the threading. So I changed my approach and made it work using the tornado periodic callback and the spin_once function of rclpy as the callback function. I would post my solution as it might help some people who has the same issue.

    import queue
    import rclpy
    from rclpy.node import Node
    import tornado.ioloop
    
    import tornado.httpserver
    import tornado.web
    
    
    from custom.msg import CustomMsg
    
    
    from .convert import message_to_ordereddict
    
    
    
    
    
    wss = []
    class wsHandler(tornado.websocket.WebSocketHandler):
        @classmethod
        def route_urls(cls):
            return [(r'/',cls, {}),]
    
        def open(self):
            print 'Online'
            if self not in wss:
                wss.append(self)
    
        def on_close(self):
            print 'Offline'
            if self in wss:
                wss.remove(self)
    
    
    def make_app():
        myWebHandler = wsHandler.route_urls()
        return tornado.web.Application(myWebHandler)
    
    
    message_queue = queue.Queue
    class MinimalSubscriber(Node):
    
        def __init__(self):
            super().__init__('minimal_subscriber')
            self.subscription = self.create_subscription(CustomMsg, 'topic', self.CustomMsg_callback, 10)
            self.subscription  # prevent unused variable warning
    
        def CustomMsg_callback(self, msg):
            msg_dict = message_to_ordereddict(msg)
            msg_queue.put(msg_dict)
    
    
    
    
    
    
    if __name__ == "__main__":
        rclpy.init(args=args)
    
        minimal_subscriber = MinimalSubscriber()
    
    
        def send_ros_to_clients():
            rclpy.spin_once(minimal_subscriber)
            my_msg = msg_queue.get()
    
            for client in ws_clients:
                client.write_message(my_msg)
    
    
        app = make_app()
        server = tornado.httpserver.HTTPServer(app)
        server.listen(8888)
        tornado.ioloop.PeriodicCallback(send_ros_to_clients, 1).start()
        tornado.ioloop.IOLoop.current().start()
    
    
    
        minimal_subscriber.destroy_node()
        rclpy.shutdown()
    

    I also implemented the wsSend function into the send_ros_to_clients function. Some might say that using a global queue is not the best practice but I could not come up with another solution. I would appreciate any suggestions or corrections on my solution.