Search code examples
c++concurrencyboost-beastboost-beast-websocket

Websocket boost/beast example with client/server


Maybe somebody can provide simple example(or references) on how to setup client and server using websocket from boost/beast library? I need an example on how to handle input message on server and respond/not to it(such that client wont crash), how to send some data to specific "subscribed" connections and how to handle it on client side. I found this example, but it doesn't work as intended: Client side:

#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;

// Thread-safe queue class
template <typename T>
class ThreadSafeQueue {
public:
    void push(const T& value) {
        std::lock_guard<std::mutex> lock(mtx_);
        queue_.push(value);
        cond_var_.notify_one();
    }

    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lock(mtx_);
        if (queue_.empty()) {
            return false;
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }

    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lock(mtx_);
        cond_var_.wait(lock, [this] { return !queue_.empty(); });
        value = queue_.front();
        queue_.pop();
    }

private:
    std::queue<T> queue_;
    mutable std::mutex mtx_;
    std::condition_variable cond_var_;
};

class Singleton {
public:
    ThreadSafeQueue<json>* data_queue;
    websocket::stream<tcp::socket>* ws;

    Singleton(
        ThreadSafeQueue<json>* data_queue_,
        websocket::stream<tcp::socket>* ws_
    ) :
        data_queue(data_queue_),
        ws(ws_)
    {}
};

void readData(Singleton& data) {
    try {
        beast::flat_buffer buffer;
        while (true) {
            data.ws->read(buffer);
            auto received_message = beast::buffers_to_string(buffer.data());
            json received_json = json::parse(received_message);

            data.data_queue->push(received_json);
            std::cout << "Received from server: " << received_message << "\n";

            buffer.consume(buffer.size());
        }
    }
    catch (std::exception e) {
        std::cout << "Error occured in reader: " << e.what() << "\n";
    }
}

void sendData(Singleton& data) {
    try {
        std::string input;
        while (true) {
            std::cout << "To send: ";
            std::getline(std::cin, input);

            if (input == "stop") break;

            json message = {
                {input[0] == 'h' ? "echo" : "null", input}
            };

            data.ws->write(asio::buffer(message.dump()));
        }
    }
    catch (std::exception e) {
        std::cout << "Error occured in sender: " << e.what() << "\n";
    }
}

int main() {
    std::string const host = "127.0.0.1";
    std::string const port = "9002";

    asio::io_context ioc;
    tcp::resolver resolver(ioc);
    websocket::stream<tcp::socket> ws(ioc);

    auto const results = resolver.resolve(host, port);
    asio::connect(ws.next_layer(), results);

    ws.handshake(host, "/");

    ThreadSafeQueue<json> data_queue;
    Singleton single(&data_queue, &ws);

    std::thread reader(readData, std::ref(single));
    std::thread sender(sendData, std::ref(single));

    reader.join();
    sender.join();

    return 0;
}

Server side:

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;

class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
    WebSocketSession(tcp::socket socket)
        : ws_(std::move(socket)) {}

    void run() {
        ws_.async_accept(
            beast::bind_front_handler(
                &WebSocketSession::on_accept,
                shared_from_this()
            )
        );
    }

private:
    void on_accept(beast::error_code ec) {
        if (ec) {
            std::cerr << "Accept error: " << ec.message() << std::endl;
            return;
        }
        do_read();
    }

    void do_read() {
        ws_.async_read(
            buffer_,
            beast::bind_front_handler(
                &WebSocketSession::on_read,
                shared_from_this()
            )
        );
    }

    void on_read(beast::error_code ec, std::size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec) {
            if (ec == websocket::error::closed) {
                return;
            }
            std::cerr << "Read error: " << ec.message() << std::endl;
            return;
        }

        try {
            auto received_message = beast::buffers_to_string(buffer_.data());
            json received_json = json::parse(received_message);

            std::string response_message;

            if (received_json.contains("echo")) {
                json response_json = {
                    {"type", "response"},
                    {"original", received_json}
                };
                response_message = response_json.dump();
            }
            else {
                buffer_.consume(buffer_.size());
                do_read();
                return;
            }

            response_ptr_ = std::make_shared<std::string>(std::move(response_message));
            ws_.text(ws_.got_text());
            ws_.async_write(
                asio::buffer(*response_ptr_),
                beast::bind_front_handler(
                    &WebSocketSession::on_write,
                    shared_from_this()));
        }
        catch (const std::exception& e) {
            std::cerr << "Processing error: " << e.what() << std::endl;
        }
    }

    void on_write(beast::error_code ec, std::size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec) {
            std::cerr << "Write error: " << ec.message() << std::endl;
            return;
        }

        buffer_.consume(buffer_.size());
        response_ptr_.reset();
        do_read();
    }

    websocket::stream<tcp::socket> ws_;
    beast::flat_buffer buffer_;
    std::shared_ptr<std::string> response_ptr_;
};

class WebSocketServer {
public:
    WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint)
        : acceptor_(ioc, endpoint) {
        do_accept();
    }

private:
    void do_accept() {
        acceptor_.async_accept(
            beast::bind_front_handler(
                &WebSocketServer::on_accept,
                this
            )
        );
    }

    void on_accept(beast::error_code ec, tcp::socket socket) {
        if (ec) {
            std::cerr << "Accept error: " << ec.message() << std::endl;
        }
        else {
            std::make_shared<WebSocketSession>(std::move(socket))->run();
        }
        do_accept();
    }

    tcp::acceptor acceptor_;
};

int main() {
    std::cout << "Web server is running:\n";
    try {
        asio::io_context ioc;
        tcp::endpoint endpoint(tcp::v4(), 9002);
        WebSocketServer server(ioc, endpoint);
        ioc.run();
    }
    catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

Solution

  • Looks to me that client doesn't crash, but instead simply only sends one message correctly https://i.imgur.com/0zrvFBP.jpeg

    Running the code under ASan/UBSan reveals no glaring issues (good).

    For the client to be full-duplex you will need threads OR async. I suggest async because it fits the library choice.

    Be careful about consuming from dynamic buffers: don't just consume the entire buffer unless it was all part of your current message.

    You need to queue outgoing writes, because you cannot have overlapping writes and apparently your interface is multi-threaded.

    On the server-side you made it simple by making sure that you don't read the next request until the response has been sent. I'd note that response_ptr_ is redundant because it is already part of shared_from_this() object. So, I'd just make it std::string response_;.

    Server notes:

    • it's weird to use ws_.got_text() since you're absolutely guaranteeing json text
    • consume your handled message in 1 spot, not in two
    • I'd consider not creating strings, since Nlohmann can parse from iterators'
    • make sure you flush stdio to see "live activity"

    Combining I'd suggest:

    • File client.cpp

       #include <boost/asio.hpp>
       #include <boost/beast.hpp>
       #include <iostream>
       #include <nlohmann/json.hpp>
       #include <queue>
      
       namespace asio      = boost::asio;
       namespace beast     = boost::beast;
       namespace websocket = beast::websocket;
       using tcp           = asio::ip::tcp;
       using json          = nlohmann::json;
      
       // Thread-safe queue class
       template <typename T> class ThreadSafeQueue {
         public:
           void push(T const& value) {
               std::lock_guard<std::mutex> lock(mtx_);
               queue_.push(value);
               cond_var_.notify_one();
           }
      
           bool try_pop(T& value) {
               std::lock_guard<std::mutex> lock(mtx_);
               if (queue_.empty()) {
                   return false;
               }
               value = queue_.front();
               queue_.pop();
               return true;
           }
      
           void wait_and_pop(T& value) {
               std::unique_lock<std::mutex> lock(mtx_);
               cond_var_.wait(lock, [this] { return !queue_.empty(); });
               value = queue_.front();
               queue_.pop();
           }
      
         private:
           std::queue<T>           queue_;
           mutable std::mutex      mtx_;
           std::condition_variable cond_var_;
       };
       using MsgQueue = ThreadSafeQueue<json>;
      
       struct  WSClient {
           WSClient(std::string const& host, std::string const& port, MsgQueue& inbox) : inbox_(inbox) {
               // connect, handshake
               connect(ws.next_layer(), tcp::resolver (ioc).resolve(host, port));
               ws.handshake(host, "/");
      
               // start async read chain
               do_read_loop();
           };
      
           void stop() {
               beast::get_lowest_layer(ws).cancel();
               ioc.join();
           }
      
           void send(json&& message) {
               asio::post(ws.get_executor(), [this, m = std::move(message)]() mutable { //
                   outbox_.push_back(std::move(m).dump());
                   if (outbox_.size() == 1)
                       do_write_loop(); // start the pump
               });
           }
      
         private:
           asio::thread_pool  ioc{1}; // single thread should suffice
           beast::flat_buffer incoming_;
           MsgQueue&          inbox_;
      
           std::deque<std::string>        outbox_; // serialized form for buffer stability
           websocket::stream<tcp::socket> ws{ioc};
      
           void do_read_loop() {
               ws.async_read(incoming_, [this](beast::error_code ec, size_t n) {
                   std::cout << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
                   if (ec)
                       return;
      
                   auto received_message = beast::buffers_to_string(incoming_.data()).substr(0, n);
                   json received_json    = json::parse(received_message);
      
                   inbox_.push(received_json);
                   std::cout << "Received from server: " << received_message << std::endl;
      
                   incoming_.consume(n);
                   do_read_loop();
               });
           }
      
           void do_write_loop() {
               if (outbox_.empty())
                   return;
      
               ws.async_write(asio::buffer(outbox_.front()), [this](beast::error_code ec, size_t n) {
                   std::cout << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
                   if (!ec) {
                       outbox_.pop_front();
                       do_write_loop();
                   }
               });
           }   
       };
      
       int main() try {
           MsgQueue received;
           WSClient client("127.0.0.1", "9002", received);
      
           std::cout << "To send: ";
           for (std::string input; std::getline(std::cin, input); std::cout << "To send: ") {
               if (input == "stop")
                   break;
               if (input.empty())
                   continue;
      
               client.send({{input.starts_with('h') ? "echo" : "null", std::move(input)}});
      
               for (json msg; received.try_pop(msg);)
                   std::cout << " - Processing queued: " << msg << std::endl;
           }
      
           client.stop();
       } catch (std::exception const& e) {
           std::cerr << "Error: " << e.what() << std::endl;
       }
      
    • File server.cpp

       #include <boost/asio.hpp>
       #include <boost/beast.hpp>
       #include <iostream>
       #include <nlohmann/json.hpp>
      
       namespace asio      = boost::asio;
       namespace beast     = boost::beast;
       namespace websocket = beast::websocket;
       using tcp           = asio::ip::tcp;
       using json          = nlohmann::json;
      
       class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
         public:
           WebSocketSession(tcp::socket socket) : ws_(std::move(socket)) {}
      
           void run() {
               ws_.async_accept(beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
           }
      
           ~WebSocketSession() { std::cerr << "Session " << peer_ << " closed" << std::endl; }
      
         private:
           void on_accept(beast::error_code ec) {
               std::cerr << "Accept: " << ec.message() << " for " << peer_ << std::endl;
               if (!ec)
                   do_read_loop();
           }
      
           void do_read_loop() {
               ws_.async_read(buffer_, beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this()));
           }
      
           void on_read(beast::error_code ec, size_t n) {
               if (ec) {
                   if (ec == websocket::error::closed) {
                       return;
                   }
                   std::cerr << "Read error: " << ec.message() << std::endl;
                   return;
               }
      
               try {
                   response_.clear();
      
                   auto it  = buffers_begin(buffer_.data());
                   json msg = json::parse(it, it + n);
                   buffer_.consume(n);
      
                   if (msg.contains("echo"))
                       response_ = json{{"type", "response"}, {"original", std::move(msg)}}.dump();
      
                   if (response_.empty()) {
                       do_read_loop();
                   } else {
                       ws_.text(true);
                       ws_.async_write(asio::buffer(response_),
                                       beast::bind_front_handler(&WebSocketSession::on_write, shared_from_this()));
                   }
               } catch (std::exception const& e) {
                   std::cerr << "Processing error: " << e.what() << std::endl;
               }
           }
      
           void on_write(beast::error_code ec, size_t n) {
               std::cerr << "Write: " << n << " bytes (" << ec.message() << ")" << std::endl;
               if (!ec)
                   do_read_loop();
           }
      
           websocket::stream<tcp::socket> ws_;
           beast::flat_buffer             buffer_;
           std::string                    response_;
           tcp::endpoint                  peer_ = ws_.next_layer().remote_endpoint();
       };
      
       class WebSocketServer {
         public:
           WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint) : acceptor_(ioc, endpoint) { do_accept(); }
      
         private:
           void do_accept() { acceptor_.async_accept(beast::bind_front_handler(&WebSocketServer::on_accept, this)); }
      
           void on_accept(beast::error_code ec, tcp::socket socket) {
               if (ec) {
                   std::cerr << "Accept error: " << ec.message() << std::endl;
               } else {
                   std::make_shared<WebSocketSession>(std::move(socket))->run();
               }
               do_accept();
           }
      
           tcp::acceptor acceptor_;
       };
      
       int main() {
           std::cout << "Web server is running:\n";
           try {
               asio::io_context ioc;
               tcp::endpoint    endpoint(tcp::v4(), 9002);
               WebSocketServer  server(ioc, endpoint);
               ioc.run();
           } catch (std::exception const& e) {
               std::cerr << "Error: " << e.what() << std::endl;
               return 1;
           }
       }
      

    With customary demos: