Search code examples
c++websockethandlerpingwebsocket++

How to implement websocket++ ping handler?


I'm trying to detect lost connections that closed without sending the close frame by sending pings on a websocket++ application.

I'm having trouble setting up the handler.

I initially tried to set it up like how the handlers are setup with the broadcast_server example:

m_server.set_ping_handler(bind(&broadcast_server::on_m_server_ping,this,::_1,::_2));

That gives this error:

note: candidate is:

websocketpp/endpoint.hpp:240:10: note: void websocketpp::endpoint::set_ping_handler(websocketpp::ping_handler) [with connection = websocketpp::connection; config = websocketpp::config::asio_tls_client; websocketpp::ping_handler = std::function, std::basic_string)>]

void set_ping_handler(ping_handler h) {

I thought that setting up a typedef like with this problem would solve it, but putting it outside the class broadcast_server makes it impossible to access m_server.

How can this handler be properly implemented?

Includes & flags

Boost 1.54

#include <websocketpp/config/asio.hpp>
#include <websocketpp/server.hpp>
#include <websocketpp/common/thread.hpp>
typedef websocketpp::server<websocketpp::config::asio_tls> server;

flags

-std=c++0x -I ~/broadcast_server -D_WEBSOCKETPP_CPP11_STL_ 
 -D_WEBSOCKETPP_NO_CPP11_REGEX_ -lboost_regex -lboost_system 
 -lssl -lcrypto -pthread -lboost_thread

typedef

typedef websocketpp::lib::function<bool(connection_hdl,std::string)> ping_handler;

Solution

  • Solving quite easy. First, the definition in websocket/connection.hpp:

    /// The type and function signature of a ping handler
    /**
     * The ping handler is called when the connection receives a WebSocket ping
     * control frame. The string argument contains the ping payload. The payload is
     * a binary string up to 126 bytes in length. The ping handler returns a bool,
     * true if a pong response should be sent, false if the pong response should be
     * suppressed.
     */
    typedef lib::function<bool(connection_hdl,std::string)> ping_handler;
    

    gives the basic idea that function must have the definition:

    bool on_ping(connection_hdl hdl, std::string s)
    {
      /* Do something */
      return true;
    }
    

    Now everything comes to the right place:

    m_server.set_ping_handler(bind(&broadcast_server::on_ping,this,::_1,::_2));
    

    The complete modified example source looks like:

    #include <websocketpp/config/asio_no_tls.hpp>
    
    #include <websocketpp/server.hpp>
    
    #include <iostream>
    
    /*#include <boost/thread.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/condition_variable.hpp>*/
    #include <websocketpp/common/thread.hpp>
    
    typedef websocketpp::server<websocketpp::config::asio> server;
    
    using websocketpp::connection_hdl;
    using websocketpp::lib::placeholders::_1;
    using websocketpp::lib::placeholders::_2;
    using websocketpp::lib::bind;
    
    using websocketpp::lib::thread;
    using websocketpp::lib::mutex;
    using websocketpp::lib::unique_lock;
    using websocketpp::lib::condition_variable;
    
    /* on_open insert connection_hdl into channel
     * on_close remove connection_hdl from channel
     * on_message queue send to all channels
     */
    
    enum action_type {
        SUBSCRIBE,
        UNSUBSCRIBE,
        MESSAGE
    };
    
    struct action {
        action(action_type t, connection_hdl h) : type(t), hdl(h) {}
        action(action_type t, connection_hdl h, server::message_ptr m)
          : type(t), hdl(h), msg(m) {}
    
        action_type type;
        websocketpp::connection_hdl hdl;
        server::message_ptr msg;
    };
    
    class broadcast_server {
    public:
        broadcast_server() {
            // Initialize Asio Transport
            m_server.init_asio();
    
            // Register handler callbacks
            m_server.set_open_handler(bind(&broadcast_server::on_open,this,::_1));
            m_server.set_close_handler(bind(&broadcast_server::on_close,this,::_1));
            m_server.set_message_handler(bind(&broadcast_server::on_message,this,::_1,::_2));
            m_server.set_ping_handler(bind(&broadcast_server::on_ping,this,::_1,::_2));
        }
    
        void run(uint16_t port) {
            // listen on specified port
            m_server.listen(port);
    
            // Start the server accept loop
            m_server.start_accept();
    
            // Start the ASIO io_service run loop
            try {
                m_server.run();
            } catch (const std::exception & e) {
                std::cout << e.what() << std::endl;
            } catch (websocketpp::lib::error_code e) {
                std::cout << e.message() << std::endl;
            } catch (...) {
                std::cout << "other exception" << std::endl;
            }
        }
    
        void on_open(connection_hdl hdl) {
            unique_lock<mutex> lock(m_action_lock);
            //std::cout << "on_open" << std::endl;
            m_actions.push(action(SUBSCRIBE,hdl));
            lock.unlock();
            m_action_cond.notify_one();
        }
    
        void on_close(connection_hdl hdl) {
            unique_lock<mutex> lock(m_action_lock);
            //std::cout << "on_close" << std::endl;
            m_actions.push(action(UNSUBSCRIBE,hdl));
            lock.unlock();
            m_action_cond.notify_one();
        }
    
        void on_message(connection_hdl hdl, server::message_ptr msg) {
            // queue message up for sending by processing thread
            unique_lock<mutex> lock(m_action_lock);
            //std::cout << "on_message" << std::endl;
            m_actions.push(action(MESSAGE,hdl,msg));
            lock.unlock();
            m_action_cond.notify_one();
        }
    
        bool on_ping(connection_hdl hdl, std::string s)
        {
          /* Do something */
          return true;
        }
    
        void process_messages() {
            while(1) {
                unique_lock<mutex> lock(m_action_lock);
    
                while(m_actions.empty()) {
                    m_action_cond.wait(lock);
                }
    
                action a = m_actions.front();
                m_actions.pop();
    
                lock.unlock();
    
                if (a.type == SUBSCRIBE) {
                    unique_lock<mutex> con_lock(m_connection_lock);
                    m_connections.insert(a.hdl);
                } else if (a.type == UNSUBSCRIBE) {
                    unique_lock<mutex> con_lock(m_connection_lock);
                    m_connections.erase(a.hdl);
                } else if (a.type == MESSAGE) {
                    unique_lock<mutex> con_lock(m_connection_lock);
    
                    con_list::iterator it;
                    for (it = m_connections.begin(); it != m_connections.end(); ++it) {
                        m_server.send(*it,a.msg);
                    }
                } else {
                    // undefined.
                }
            }
        }
    private:
        typedef std::set<connection_hdl,std::owner_less<connection_hdl>> con_list;
    
        server m_server;
        con_list m_connections;
        std::queue<action> m_actions;
    
        mutex m_action_lock;
        mutex m_connection_lock;
        condition_variable m_action_cond;
    };
    
    int main() {
        try {
        broadcast_server server_instance;
    
        // Start a thread to run the processing loop
        thread t(bind(&broadcast_server::process_messages,&server_instance));
    
        // Run the asio loop with the main thread
        server_instance.run(9002);
    
        t.join();
    
        } catch (std::exception & e) {
            std::cout << e.what() << std::endl;
        }
    }