My code is composed of three classes: the “session” class (async websocket server), the “tcp_session” class (async tcp server) and the “listener” class (receives incoming tcp connections generically).
the listener class accepts all tcp connections. I've declared two instances of this class, on different ports (e.g. 8000 and 7777). the first connection (from the websocket client on port 8000) initializes an async websocket server by creating an instance of the session class. Communication between the websocket client and the websocket server then functions normally.
When a tcp message is subsequently received, the listener class instantiates the tcp_session class, passing it by reference the instance of the session class previously created in its constructor.
The tcp_session class can then transmit the tcp messages it receives by calling the write_data_to_websocket function of its session class instance.
The session class receives the transferred data, but causes the program to crash on writing (ws_.async_write).
I tried copying the data from the tcp_session class buffer into the session class buffer, and also calling ws_.async_write directly to write the data there. Here is below my code sample :
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------
std::mutex mtx;
std::condition_variable cv;
bool is_websocket_initialized = false;
void
fail(beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
class session : public std::enable_shared_from_this<session>
{
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
public:
explicit
session(tcp::socket&& socket)
: ws_(std::move(socket))
{
}
websocket::stream<beast::tcp_stream>& get_websocket_stream() {
return ws_;
}
beast::flat_buffer& get_websocket_buffer() {
return buffer_;
}
void
run()
{
net::dispatch(ws_.get_executor(),
beast::bind_front_handler(
&session::on_run,
shared_from_this()));
}
void
on_run()
{
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::server));
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
}));
ws_.async_accept(
beast::bind_front_handler(
&session::on_accept,
shared_from_this()));
}
void
on_accept(beast::error_code ec)
{
if (ec)
return fail(ec, "accept"); {
}
std::cout << "WebSocket connection initialized" << std::endl;
if (!is_websocket_initialized) {
is_websocket_initialized = true;
do_read();
}
else {
do_read();
}
}
void
do_read()
{
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
}
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec == websocket::error::closed)
return;
if (ec)
return fail(ec, "read");
ws_.text(ws_.got_text());
ws_.async_write(
buffer_.data(),
beast::bind_front_handler(
&session::on_write,
shared_from_this()));
}
void write_data_to_websocket(beast::flat_buffer* buff) {
std::cout << "received : " << beast::buffers_to_string(buff->data())<< "\n";
ws_.text(ws_.got_text());
ws_.async_write(
buff->data(),
beast::bind_front_handler(
&session::on_write,
shared_from_this()));
}
void
on_write(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
buffer_.consume(buffer_.size());
is_websocket_initialized = true;
do_read();
}
};
class tcp_session : public std::enable_shared_from_this<tcp_session> {
tcp::socket socket_;
beast::flat_buffer buffer_;
std::shared_ptr<session> s;
public:
explicit tcp_session(tcp::socket&& socket, std::shared_ptr<session> sess) : socket_(std::move(socket)) { s = sess; }
void run() {
do_read();
}
void do_read() {
auto self = shared_from_this();
socket_.async_read_some(buffer_.prepare(1024), [self](beast::error_code ec, std::size_t bytes_transferred) {
self->on_read(ec, bytes_transferred);
});
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
if (ec) {
fail(ec, "tcp read");
return;
}
buffer_.commit(bytes_transferred);
buffer_.commit(bytes_transferred);
{
auto& session_buffer = s->get_websocket_buffer();
s->write_data_to_websocket(&buffer_);
buffer_.consume(buffer_.size());
do_read();
}
}
};
class listener : public std::enable_shared_from_this<listener>
{
net::io_context& ioc_;
tcp::acceptor acceptor_;
bool is_websocket;
std::shared_ptr<session> sess;
public:
listener(
net::io_context& ioc,
tcp::endpoint endpoint,bool ws_flag)
: ioc_(ioc)
, acceptor_(ioc)
{
is_websocket = ws_flag;
beast::error_code ec;
acceptor_.open(endpoint.protocol(), ec);
if (ec)
{
fail(ec, "open");
return;
}
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
if (ec)
{
fail(ec, "set_option");
return;
}
acceptor_.bind(endpoint, ec);
if (ec)
{
fail(ec, "bind");
return;
}
acceptor_.listen(
net::socket_base::max_listen_connections, ec);
if (ec)
{
fail(ec, "listen");
return;
}
}
void
run()
{
do_accept();
}
private:
void
do_accept()
{
acceptor_.async_accept(
net::make_strand(ioc_),
beast::bind_front_handler(
&listener::on_accept,
shared_from_this()));
}
void
on_accept(beast::error_code ec, tcp::socket socket)
{
if (ec)
{
fail(ec, "accept");
}
else if(is_websocket && !is_websocket_initialized)
{
// Create the session and run it
std::cout << "creating websocket" << "\n";
sess = std::make_shared<session>(socket);
sess->run();
}
else if(is_websocket_initialized) {
std::cout << "receiving as tcp_socket is_websocket_initialized" << "\n";
auto tcpSess = std::make_shared<tcp_session>(std::move(socket), sess);
tcpSess->run();
}
do_accept();
}
};
//------------------------------------------------------------------------------
int main(int argc, char* argv[])
{
if (argc != 4)
{
std::cerr <<
"Usage: websocket-server-async <address> <port> <threads>\n" <<
"Example:\n" <<
" websocket-server-async 0.0.0.0 8080 1\n";
return EXIT_FAILURE;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));
net::io_context ioc{ threads };
auto const test_port = 7777;
std::make_shared<listener>(ioc, tcp::endpoint{ address, port },true)->run();
std::make_shared<listener>(ioc, tcp::endpoint{ address,test_port }, false)->run();
std::vector<std::thread> v;
v.reserve(threads - 1);
for (auto i = threads - 1; i > 0; --i)
v.emplace_back(
[&ioc]
{
ioc.run();
});
ioc.run();
return EXIT_SUCCESS;
}
The get_websocket_buffer
and write_data_to_websocket
violate Law-Of-Demeter.
You need to separate responsibilities. Make sure that ws_session
does know how safely write to the websocket. Here, safely means:
Regardless, you have buffer_.commit(bytes_transferred);
duplicated. Instead, use a composed read with the DynamicBuffer, so you commit becomes the responsibility of async_read
. Also, while you tried to call get_websocket_buffer
you forgot to actually use sessions_buffer
. Instead, you sent buffer_
, which isn't safe at all.
Of course, you will not inflict the accidental buffer type on the caller. Instead, accept messages in a good public interface, e.g.:
public:
void send(std::string message) {
dispatch(ws_.get_executor(), [self = shared_from_this(), m = std::move(message)]() mutable {
self->do_send(std::move(m));
});
}
Where the implementation details are suitably hidden in private:
private:
void do_send(std::string m) { // private methods assumed on strand
messages_.push_back(std::move(m));
if (messages_.size() == 1)
write_loop();
}
void write_loop() {
if (outgoing_.empty())
return;
// ws_.text(ws_.got_text()); // this makes no sense; you should know what you're sending
ws_.async_write( //
net::buffer(outgoing_.front()), [this, self = shared_from_this()](error_code ec, size_t) {
if (ec)
return fail(ec, "write");
outgoing_.pop_front();
write_loop();
});
}
The statefulness of the listener is a code smell, especially with the racy global is_websocket_initialized
. Instead, code your INTENTIONS:
net::thread_pool ioc(threads);
auto ws = accept_websocket_session(ioc.get_executor(), {address, port});
listener tcp_listener(ioc.get_executor(), tcp::endpoint{address, test_port}, [ws](tcp::socket s) {
std::make_shared<plain_session>(std::move(s), ws.lock())->run();
return true;
});
ioc.join();
Now your listener can be as simple as
struct listener {
using AcceptHandler = std::function<bool(tcp::socket)>;
listener(net::any_io_executor ex, tcp::endpoint ep, AcceptHandler on_accept)
: acceptor_(ex, ep), on_accept_(std::move(on_accept)) {
acceptor_.set_option(tcp::acceptor::reuse_address(true));
accept_loop();
}
private:
void accept_loop() {
acceptor_.async_accept( //
make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket socket) {
if (ec)
return fail(ec, "accept");
if (on_accept_ && on_accept_(std::move(socket)))
accept_loop(); // continue accepting
});
}
tcp::acceptor acceptor_;
AcceptHandler on_accept_;
};
And accept_websocket_session
could be even simpler:
std::weak_ptr<ws_session> accept_websocket_session(net::any_io_executor ex, tcp::endpoint ep) {
tcp::acceptor acceptor(ex, ep);
acceptor.set_option(tcp::acceptor::reuse_address(true));
auto s = std::make_shared<ws_session>(acceptor.accept());
s->run();
return s;
}
Here's the integration of all that, with a live demo:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <deque>
#include <iostream>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using beast::error_code;
using net::ip::tcp;
//------------------------------------------------------------------------------
void fail(error_code ec, char const* what) { std::cerr << what << ": " << ec.message() << std::endl; }
class ws_session : public std::enable_shared_from_this<ws_session> {
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer incoming_;
std::deque<std::string> outgoing_;
auto bound(auto member) { return beast::bind_front_handler(member, shared_from_this()); }
public:
explicit ws_session(tcp::socket&& socket) : ws_(std::move(socket)) {}
~ws_session() { std::cout << "WebSocket connection closed" << std::endl; }
void run() { dispatch(ws_.get_executor(), bound(&ws_session::do_run)); }
void send(std::string message) {
dispatch(ws_.get_executor(), [self = shared_from_this(), m = std::move(message)]() mutable {
self->do_send(std::move(m));
});
}
private: // private methods assumed on strand
void do_run() {
using sb = websocket::stream_base;
ws_.set_option(sb::timeout::suggested(beast::role_type::server));
ws_.set_option(sb::decorator([](websocket::response_type& res) {
res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " whatever-this-is");
}));
ws_.async_accept(bound(&ws_session::on_accept));
}
void on_accept(error_code ec) {
if (ec)
return fail(ec, "accept");
read_loop();
}
void do_send(std::string m) {
outgoing_.push_back(std::move(m));
if (outgoing_.size() == 1)
write_loop();
}
void write_loop() {
if (outgoing_.empty())
return;
// ws_.text(ws_.got_text()); // this makes no sense; you should know what you're sending
ws_.async_write( //
net::buffer(outgoing_.front()), [this, self = shared_from_this()](error_code ec, size_t) {
if (ec)
return fail(ec, "write");
outgoing_.pop_front();
write_loop();
});
}
void read_loop() {
ws_.async_read(incoming_, [this, self = shared_from_this()](error_code ec, size_t n) {
if (n)
do_send("echo: " + beast::buffers_to_string(incoming_.cdata()));
if (ec)
return fail(ec, "read");
incoming_.clear();
read_loop();
});
}
};
class plain_session : public std::enable_shared_from_this<plain_session> {
tcp::socket socket_;
std::string buffer_;
std::weak_ptr<ws_session> s;
public:
explicit plain_session(tcp::socket&& socket, std::shared_ptr<ws_session> sess)
: socket_(std::move(socket)) {
s = sess;
}
void run() { read_loop(); }
private:
void read_loop() {
buffer_.resize(1024);
socket_.async_read_some( //
net::buffer(buffer_), //
[this, self = shared_from_this()](error_code ec, size_t n) {
if (ec)
return fail(ec, "tcp read");
if (auto ws = s.lock()) { // if the websocket is still alive
std::cout << "Forwarding " << n << " bytes to WebSocket" << std::endl;
ws->send(buffer_.substr(0, n));
read_loop();
} else {
std::cout << "WebSocket is gone, closing TCP connection" << std::endl;
}
});
}
};
struct listener {
using AcceptHandler = std::function<bool(tcp::socket)>;
listener(net::any_io_executor ex, tcp::endpoint ep, AcceptHandler on_accept)
: acceptor_(ex, ep), on_accept_(std::move(on_accept)) {
acceptor_.set_option(tcp::acceptor::reuse_address(true));
run();
}
void run() { accept_loop(); }
void stop() {
dispatch(acceptor_.get_executor(), [this] { acceptor_.cancel(); });
}
private:
void accept_loop() {
acceptor_.async_accept( //
make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket socket) {
if (ec)
return fail(ec, "accept");
if (on_accept_ && on_accept_(std::move(socket)))
accept_loop(); // continue accepting
});
}
tcp::acceptor acceptor_;
AcceptHandler on_accept_;
};
std::weak_ptr<ws_session> accept_websocket_session(net::any_io_executor ex, tcp::endpoint ep) {
tcp::acceptor acceptor(ex, ep);
acceptor.set_option(tcp::acceptor::reuse_address(true));
auto s = std::make_shared<ws_session>(acceptor.accept());
s->run();
return s;
}
//------------------------------------------------------------------------------
int main(int argc, char* argv[]) {
if (argc != 4) {
std::cerr << R"(Usage: whatever-this-is <address> <port> <threads>
Example:
whatever-this-is 0.0.0.0 8080 1)"
<< std::endl;
return 1;
}
auto const address = net::ip::make_address(argv[1]);
uint16_t const port = static_cast<unsigned short>(std::atoi(argv[2]));
size_t const threads = std::max<int>(1, std::atoi(argv[3]));
uint16_t const test_port = 7777;
net::thread_pool ioc(threads);
auto ws = accept_websocket_session(ioc.get_executor(), {address, port});
std::cout << "WebSocket connection initialized" << std::endl;
listener tcp_listener(ioc.get_executor(), {address, test_port}, [ws](tcp::socket s) {
std::cout << "Accepted TCP connection " << s.remote_endpoint() << std::endl;
std::make_shared<plain_session>(std::move(s), ws.lock()) //
->run();
return true;
});
ioc.join();
}
So, 57% of the code but more features, and easier to reason about.