I've constructed a socket client and server and using it communicate locally
Building is good, but when running in root privilege(bind() needs root), the socket got bad version while handshake.
for simplicity, the error code came in:
on_handshake()
on_write()
Beast version: 1.70.0
Code:
ClientService.cc
// Client Connect to Server
void ClientService::Connect(const std::string &host, unsigned int port) {
const auto id = (unsigned int)(mpSLAM->GetMap()->mnId);
mThread = new std::thread([this, host, port, id] {
auto const text = "Hello, world!";
info("client {} connect to host {} port {}", id, host, port);
// The io_context is required for all I/O
boost::asio::io_context ioc;
work_guard_type workGuard(ioc.get_executor());
const std::string specify_local_address = "0.0.0.0";
unsigned int specify_local_portnumber = 20000;
info("In connection: client bind to local endpoint host {} port {}", specify_local_address, specify_local_portnumber);
// Launch the asynchronous operation, which would call WebSocket.h
auto session = std::make_shared<WS::Client::session>(ioc, specify_local_address.c_str(), reinterpret_cast<unsigned int *>(specify_local_portnumber),
std::bind(&ClientService::OnRequest, this, std::placeholders::_1));
this->service = session;
session->run(host.c_str(), reinterpret_cast<unsigned int *>(port), text);
// Run the I/O service. The call will return when
// the socket is closed.
ioc.run();
});
}
void ClientService::SendRequest(const Request &req) {
// serialize and send request
std::string msg = ORB_SLAM2::toString(req);
this->service->send(make_shared<std::string>(msg));
}
WebSocket.h of client
namespace Client {
// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session> {
// we do not need a resolver since itself initialize a connection
// tcp::resolver resolver_;
// websocket::stream <tcp::socket> ws_;
websocket::stream <beast::tcp_stream> ws_;
// beast::tcp_stream ws_;
beast::flat_buffer buffer_;
std::vector<std::shared_ptr<const std::string>> queue;
std::string host_;
std::uint8_t port_;
std::function<void(const std::string&)> on_message;
std::string localhost_;
std::uint8_t localport_;
//the constructor
public:
// Resolver and socket require an io_context
explicit
session(net::io_context &ioc, char const *localhost, unsigned int *localport, std::function<void(const std::string&)> on_message)
: ws_(net::make_strand(ioc)), on_message(std::move(on_message))
{
localhost_ = localhost;
std::stringstream str_port_value;
str_port_value << localport;
str_port_value >> localport_;
beast::error_code err;
//Here I've bind the local endpoint
beast::get_lowest_layer(ws_).socket().open(boost::asio::ip::tcp::v4(), err);
beast::get_lowest_layer(ws_).socket().bind(tcp::endpoint(boost::asio::ip::make_address_v4(localhost_), localport_));
}
// Start the asynchronous operation
void
run(
char const *host,
unsigned int *port,
__attribute__((unused)) char const *text) {
// Save these for later
host_ = host;
std::stringstream str_port_value;
str_port_value << port;
str_port_value >> port_;
// dropped the resolver
// resolver_.async_resolve(
// host,
// port,
// beast::bind_front_handler(
// &session::on_resolve,
// shared_from_this()));
//construct a tcp::endpoint using ip::address_v4 and port number
tcp::endpoint ep(boost::asio::ip::address::from_string(host_.c_str()), port_);
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
//here just connect to ep without resolver
beast::get_lowest_layer(ws_).socket().async_connect(
ep,
beast::bind_front_handler(
&session::on_connect,
shared_from_this()));
}
void
// on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) {
on_connect(beast::error_code ec)
{
// get the ep parameter from run() as ep_
tcp::endpoint ep_(boost::asio::ip::address::from_string(host_.c_str()), port_);
if (ec)
return fail(ec, "connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// output on screen said making a handshake with server
std::cout << "Making a handshake with server" << std::endl;
//where possibly go wrong
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type &req) {
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-async");
}));
// update the host string. This will provide the value of the
// host HTTP header during the websocket handshake
// the guide references: https://tools.ietf.org/html/rfc7230#section-5.4
host_ += ':' + std::to_string(ep_.port());
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
beast::bind_front_handler(
&session::on_handshake,
shared_from_this()));
}
void
on_handshake(beast::error_code ec) {
//here comes the error code
if (ec)
return fail(ec, "handshake");
buffer_.consume(buffer_.size());
net::post(ws_.get_executor(), beast::bind_front_handler(&session::on_read, shared_from_this(), ec, 5));
std::cout << "Handshake successful." << std::endl;
}
void
on_write(
beast::error_code ec,
std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
//another error code
if (ec)
return fail(ec, "write");
queue.erase(queue.begin());
// send the message if any
if (!queue.empty()) {
ws_.async_write(net::buffer(*queue.front()),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}
}
WebSocket.h of server
public:
// Take ownership of the socket
explicit
session(tcp::socket &&socket, std::shared_ptr<shared_state> state, std::function<void(std::string)> on_message)
: ws_(std::move(socket)), state(std::move(state)), on_message(std::move(on_message)) {
}
~session() {
std::cout << "~session()" << std::endl;
state->leave(this);
}
// Start the asynchronous operation
void
run() {
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type &res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
}));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(
&session::on_accept,
shared_from_this()));
}
void
on_accept(beast::error_code ec) {
if (ec)
return fail(ec, "accept");
state->join(this);
// Read a message
do_read();
}
running log:
[11:07:16][3518][I][RegisterRemote:70] Registering remote client
[11:07:16][3518][I][Register:172] client bind to local endpoint host 0.0.0.0 port 20001
[11:07:16][3518][I][Register:173] client register to host 0.0.0.0 port 10088
1 2330
[11:07:16][3518][I][RegisterRemote:79] Registered client with id: 1 and port: 2330
[11:07:16][3518][I][RegisterRemote:85] Connecting to the data channel
[11:07:16][3518][I][RegisterRemote:89] Connected to the data channel
[11:07:16][3533][I][operator():39] client 1 connect to host 0.0.0.0 port 2330
[11:07:16][3533][I][operator():54] In connection: client bind to local endpoint host 0.0.0.0 port 20000
Making a handshake with server
handshake: bad version
Gtk-Message: 11:07:16.297: Failed to load module "canberra-gtk-module"
write: Operation canceled
Before I removed the resolver and modified the code with binding local endpoint, it works good.
But I didn't modify the handshake part.
and cannot see whats going on in debug mode because it straight go into the write operation.
is there any wrong with the code?
or is that I used
websocket::stream <beast::tcp_stream> ws_;
although it is used in original code
any assistance and guidance would be appreciated, Thanks!
You have many bugs surrounding at least port numbers. You are reinterpreting unsigned numbers as... pointers, which you then convert by writing to a string stream and back, but... into uint8_t
. No wonder you "require root" to do bind
: the first 1024 ports are privileged.
Beyond, you are doing strange things like
work_guard
, as you are using a block io_context::run()
later on to do all workerror_code
bind
-ing on a client socket; this is very rare - it might be what you intend, although I don't particularly know whyconsume()
-ing a buffer that is never even usedthis->service
with a single session
instance; why is there a thread in the first place, if all it does is run the io context?io_context
is owned by a single threadon_write
, so nothing will ever start writing (queue
is not used outside of on_write
)on_read
manually, with a fake size of 5
?Adding a listener
loosely based on the same Beast example, I made it self-contained to reproduce your message:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/lexical_cast.hpp>
#include <functional>
#include <fmt/ranges.h>
#define info(...) fmt::print(__VA_ARGS__)
namespace WS {
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace http = beast::http;
using boost::system::error_code;
using net::ip::tcp;
static inline void fail(error_code ec, std::string_view where) {
info("{}: {}\n", where, ec.message());
}
using Message = std::string;
using MessageHandler = std::function<void(Message const&)>;
namespace Client {
// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session> {
std::vector<std::shared_ptr<Message const>> queue;
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::string host_, localhost_;
std::uint16_t port_, localport_;
MessageHandler on_message;
public:
// Resolver and socket require an io_context
explicit session(net::io_context& ioc, std::string localhost, uint16_t localport,
MessageHandler on_message)
: ws_(net::make_strand(ioc), tcp::v4())
, localhost_(std::move(localhost))
, localport_(localport)
, on_message(std::move(on_message)) //
{
auto& s = get_lowest_layer(ws_).socket();
s.set_option(net::socket_base::reuse_address(true));
s.bind(tcp::endpoint(net::ip::make_address_v4(localhost_), localport_));
}
// Start the asynchronous operation
void run(std::string host, uint16_t port, std::string text) {
boost::ignore_unused(text);
// Save these for later
host_ = host;
port_ = port;
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// here just connect to ep without resolver
tcp::endpoint ep(net::ip::address::from_string(host_), port_);
beast::get_lowest_layer(ws_).socket().async_connect(
ep, beast::bind_front_handler(&session::on_connect, shared_from_this()));
}
void on_connect(beast::error_code ec) {
auto& ll = beast::get_lowest_layer(ws_);
// get the ep parameter from run() as ep_
tcp::endpoint ep_ = !ec ? ll.socket().local_endpoint(ec) : tcp::endpoint{};
if (ec)
return fail(ec, "connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
ll.expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
// output on screen said making a handshake with server
info("Making a handshake with server\n");
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
}));
// update the host string. This will provide the value of the
// host HTTP header during the websocket handshake the guide
// references: https://tools.ietf.org/html/rfc7230#section-5.4
host_ += ':' + std::to_string(ep_.port());
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
beast::bind_front_handler(&session::on_handshake, shared_from_this()));
}
void on_handshake(beast::error_code ec) {
if (ec)
return fail(ec, "handshake");
// buffer_.consume(buffer_.size());
// net::post(ws_.get_executor(),
// beast::bind_front_handler(&session::on_read, shared_from_this(), ec, 5));
info("Handshake successful.\n");
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
// another error code
if (ec)
return fail(ec, "write");
queue.erase(queue.begin());
// send the message if any
if (!queue.empty()) {
ws_.async_write(net::buffer(*queue.front()),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}
}
};
struct ClientService {
struct SLAM {
auto GetMap() { return &_map; }
struct {
unsigned mnId = 42;
} _map;
};
std::unique_ptr<SLAM> mpSLAM{new SLAM};
std::thread* mThread = nullptr;
void Connect(std::string const& host, uint16_t port);
private:
net::io_context ioc;
using work_guard_type = net::executor_work_guard<net::io_context::executor_type>;
std::shared_ptr<Client::session> service;
void OnRequest(Message const& msg) { info("OnRequest('{}')\n", msg); }
};
// Client Connect to Server
void ClientService::Connect(std::string const& host, uint16_t port) {
auto const id = static_cast<unsigned>( // friends don't let friends use C-style casts
mpSLAM->GetMap()->mnId);
mThread =
new std::thread([this, host, port, id] {
auto const text = "Hello, world!";
info("client {} connect to host {} port {}\n", id, host, port);
// The io_context is required for all I/O
net::io_context ioc;
work_guard_type workGuard(ioc.get_executor());
std::string specify_local_address = "0.0.0.0";
uint16_t specify_local_portnumber = 20'000;
info("In connection: client bind to local endpoint host {} port {}\n",
specify_local_address, specify_local_portnumber);
// Launch the asynchronous operation, which would call WebSocket.h
auto session = std::make_shared<WS::Client::session>(
ioc, specify_local_address, specify_local_portnumber,
std::bind(&ClientService::OnRequest, this, std::placeholders::_1));
this->service = session;
session->run(host, port, text);
// Run the I/O service
ioc.run();
});
}
} // namespace Client
namespace Server {
class session;
struct shared_state {
void join(session const*) {}
void leave(session const*) {}
};
class session : public std::enable_shared_from_this<session> {
websocket::stream<tcp::socket> ws_;
std::shared_ptr<shared_state> state;
MessageHandler on_message;
public:
session(tcp::socket&& socket, std::shared_ptr<shared_state> state,
MessageHandler on_message)
: ws_(std::move(socket))
, state(std::move(state))
, on_message(std::move(on_message)) {}
~session() {
info("!session()\n");
state->leave(this);
}
// Start the asynchronous operation
void run() {
ws_.set_option(
websocket::stream_base::timeout::suggested(beast::role_type::server));
ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
}));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(&session::on_accept, shared_from_this()));
}
void on_accept(beast::error_code ec) {
if (ec)
return fail(ec, "accept");
state->join(this);
// Read a message
// do_read();
}
};
// after the example from
// https://www.boost.org/doc/libs/master/libs/beast/example/websocket/server/async/websocket_server_async.cpp
class listener : public std::enable_shared_from_this<listener> {
net::io_context& ioc_;
tcp::acceptor acceptor_;
public:
listener(net::io_context& ioc, tcp::endpoint endpoint) try : ioc_(ioc), acceptor_(ioc) {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(net::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen(net::socket_base::max_listen_connections);
} catch (boost::system::system_error const& se) {
fail(se.code(), boost::lexical_cast<std::string>(se.code().location()));
}
// Start accepting incoming connections
void run() { do_accept(); }
private:
std::shared_ptr<shared_state> state_ = std::make_shared<shared_state>();
void do_accept() {
acceptor_.async_accept(
make_strand(ioc_),
beast::bind_front_handler(&listener::on_accept, shared_from_this()));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
fail(ec, "accept");
} else {
auto handler = [ep = socket.remote_endpoint()](Message const& msg) {
info("From {}: {}\n", boost::lexical_cast<std::string>(ep), msg);
};
// Create the session and run it
auto conn = std::make_shared<session>(std::move(socket), state_, handler);
conn->run();
}
// Accept another connection
do_accept();
}
};
} // namespace Server
} // namespace WS
int main() {
using namespace WS;
net::io_context ioc;
auto ls = std::make_shared<Server::listener>(ioc, tcp::endpoint{{}, 20'001});
ls->run();
Client::ClientService cs;
cs.Connect("127.0.0.1", 20'000u);
ioc.run();
}
Local output:
The obvious mistake here is that I'm connecting to 20'000
which is the local-bind port of the client itself! Fixing to 20'001
(or whatever port you ran the server on):
Addressing some more of the issues mentioned above, and adding actual functionality:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/lexical_cast.hpp>
#include <deque>
#include <fmt/ranges.h>
#define info(...) \
do { \
fmt::print(__VA_ARGS__); \
std::fflush(stdout); \
} while (0)
using namespace std::chrono_literals;
namespace WS {
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace http = beast::http;
using boost::system::error_code;
using beast::bind_front_handler;
using net::ip::tcp;
static inline void fail(error_code ec, std::string_view where) {
info("{}: {}\n", where, ec.message());
}
using Message = std::string;
using MessageHandler = std::function<void(Message const&)>;
namespace Client {
// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session> {
websocket::stream<beast::tcp_stream> ws_;
bool handshake_completed_ = false;
std::deque<Message> queue_;
beast::flat_buffer buffer_;
std::string ip_address_, local_addr_;
std::uint16_t port_, local_port_;
MessageHandler on_message;
public:
// Resolver and socket require an io_context
explicit session(net::any_io_executor ex, std::string localhost, uint16_t localport,
MessageHandler on_message)
: ws_{ex, tcp::v4()} // assumed single-threaded execution context
, local_addr_{std::move(localhost)}
, local_port_{localport}
, on_message{std::move(on_message)} //
{
auto& s = get_lowest_layer(ws_).socket();
s.set_option(net::socket_base::reuse_address(true));
s.bind({net::ip::make_address_v4(local_addr_), local_port_});
}
// Start the asynchronous operation
void run(std::string ip_address, uint16_t port) {
// Save these for later
ip_address_ = ip_address;
port_ = port;
beast::get_lowest_layer(ws_).expires_after(30s);
// host assumed to be resolved address
get_lowest_layer(ws_).socket().async_connect(
{net::ip::address::from_string(ip_address_), port_},
bind_front_handler(&session::on_connect, shared_from_this()));
}
void stop() {
post(ws_.get_executor(), [self = shared_from_this()] {
info("Closing down websocket\n");
get_lowest_layer(self->ws_).cancel();
// self->ws_.close("stop");
});
}
void enqueue(std::string msg) {
post(ws_.get_executor(), [m = std::move(msg), self = shared_from_this()] {
self->do_enqueue(std::move(m));
});
}
private:
void on_connect(beast::error_code ec) {
if (ec)
return fail(ec, __PRETTY_FUNCTION__);
auto& ll = get_lowest_layer(ws_);
// auto local_port = ll.socket().local_endpoint().port();
// Turn off the timeout on the tcp_stream, because the
// websocket stream has its own timeout system.
ll.expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
}));
// update the host string. This will provide the value of the
// host HTTP header during the websocket handshake the guide
// references: https://tools.ietf.org/html/rfc7230#section-5.4
ip_address_ += ':' + std::to_string(port_); // TODO REVIEW why mutate?
// output on screen said making a handshake with server
info("Making a handshake with {}\n", ip_address_);
// Perform the websocket handshake
ws_.async_handshake(ip_address_, "/",
bind_front_handler(&session::on_handshake, shared_from_this()));
}
void on_handshake(beast::error_code ec) {
if (ec)
return fail(ec, __PRETTY_FUNCTION__);
info("Handshake successful.\n");
ws_.async_read(buffer_, bind_front_handler(&session::on_read, shared_from_this()));
handshake_completed_ = true;
do_write(); // if already queued
}
void on_read(error_code ec, size_t n) {
if (ec)
return fail(ec, __PRETTY_FUNCTION__);
if (on_message)
on_message(beast::buffers_to_string(buffer_.cdata()).substr(0, n));
buffer_.consume(n);
ws_.async_read(buffer_, bind_front_handler(&session::on_read, shared_from_this()));
}
void do_enqueue(Message msg) { // assumed on strand!
queue_.push_back(std::move(msg));
if (queue_.size() == 1)
do_write();
}
void do_write() {
if (!handshake_completed_ || queue_.empty())
return;
info("{}: Initiating write ({} pending) '{}'\n", __PRETTY_FUNCTION__, queue_.size(),
queue_.front());
ws_.async_write(net::buffer(queue_.front()), // FIFO
bind_front_handler(&session::on_write, shared_from_this()));
}
void on_write(beast::error_code ec, size_t) {
if (ec)
return fail(ec, __PRETTY_FUNCTION__);
queue_.pop_front();
do_write(); // continue until queue empty
}
};
struct ClientService {
~ClientService() {
if (connection_)
connection_->stop();
// ioc.stop(); // optionally?
ioc.join();
}
void Connect(std::string const& host, uint16_t port) {
info("client {} connect to host {} port {}\n", mpSLAM->GetMap()->mnId, host, port);
connection_ = std::make_shared<WS::Client::session>(
ioc.get_executor(), //
"0.0.0.0", 20'000,
std::bind(&ClientService::OnRequest, this, std::placeholders::_1));
connection_->run(host, port);
}
void Send(Message msg) {
assert(connection_);
connection_->enqueue(std::move(msg));
}
private:
net::thread_pool ioc{1};
std::shared_ptr<Client::session> connection_;
void OnRequest(Message const& msg) {
info("OnRequest('{}')\n", msg);
assert(connection_);
}
struct SLAM {
struct Map { unsigned mnId = 42; } _map;
Map const* GetMap() const { return &_map; }
};
std::unique_ptr<SLAM> mpSLAM{new SLAM};
};
} // namespace Client
namespace Server {
class session;
struct shared_state {
void join(session const*) {}
void leave(session const*) {}
};
class session : public std::enable_shared_from_this<session> {
websocket::stream<tcp::socket> ws_;
std::shared_ptr<shared_state> state;
MessageHandler on_message;
public:
session(tcp::socket&& socket, std::shared_ptr<shared_state> state,
MessageHandler on_message)
: ws_(std::move(socket))
, state(std::move(state))
, on_message(std::move(on_message)) {}
~session() {
info("~session()\n");
state->leave(this);
}
// Start the asynchronous operation
void run() {
ws_.set_option(
websocket::stream_base::timeout::suggested(beast::role_type::server));
ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
}));
// Accept the websocket handshake
ws_.async_accept(bind_front_handler(&session::on_accept, shared_from_this()));
}
private:
void on_accept(beast::error_code ec) {
info("{}: {}\n", __PRETTY_FUNCTION__, ec.message());
if (ec)
return fail(ec, __PRETTY_FUNCTION__);
state->join(this);
do_read();
}
beast::flat_buffer buffer_;
void do_read() {
ws_.async_read(buffer_, bind_front_handler(&session::on_read, shared_from_this()));
}
void on_read(error_code ec, size_t n) {
info("{}: {}, {}\n", __PRETTY_FUNCTION__, ec.message(), n);
if (ec)
return fail(ec, __PRETTY_FUNCTION__);
if (on_message)
on_message(beast::buffers_to_string(buffer_.cdata()).substr(0, n));
buffer_.consume(n);
ws_.async_read(buffer_, bind_front_handler(&session::on_read, shared_from_this()));
}
};
// after the example from
// https://www.boost.org/doc/libs/master/libs/beast/example/websocket/server/async/websocket_server_async.cpp
class listener : public std::enable_shared_from_this<listener> {
net::io_context& ioc_;
tcp::acceptor acceptor_;
public:
listener(net::io_context& ioc, tcp::endpoint endpoint) try : ioc_(ioc), acceptor_(ioc) {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(net::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen(net::socket_base::max_listen_connections);
} catch (boost::system::system_error const& se) {
fail(se.code(), boost::lexical_cast<std::string>(se.code().location()));
}
// Start accepting incoming connections
void run() { do_accept(); }
private:
std::shared_ptr<shared_state> state_ = std::make_shared<shared_state>();
void do_accept() {
acceptor_.async_accept(
make_strand(ioc_),
bind_front_handler(&listener::on_accept, shared_from_this()));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
info("{}: {} from {}\n", __PRETTY_FUNCTION__, ec.message(),
boost::lexical_cast<std::string>(socket.remote_endpoint()));
if (ec) {
fail(ec, __PRETTY_FUNCTION__);
} else {
auto handler = [ep = socket.remote_endpoint()](Message const& msg) {
info("From {}: '{}'\n", boost::lexical_cast<std::string>(ep), msg);
};
// Create the session and run it
auto conn = std::make_shared<session>(std::move(socket), state_, handler);
conn->run();
}
// Accept another connection
do_accept();
}
};
} // namespace Server
} // namespace WS
int main() {
using namespace WS;
net::io_context ioc;
auto ls = std::make_shared<Server::listener>(ioc, tcp::endpoint{{}, 20'001});
ls->run();
Client::ClientService cs;
cs.Connect("127.0.0.1", 20'001u);
cs.Send("Hello world");
cs.Send("Bye world");
ioc.run_for(5s);
}
Local test output:
client 42 connect to host 127.0.0.1 port 20001
Making a handshake with 127.0.0.1:20001
void WS::Server::listener::on_accept(boost::beast::error_code, boost::asio::ip::tcp::socket): Success from 127.0.0.1:20000
Handshake successful.
void WS::Server::session::on_accept(boost::beast::error_code): Success
void WS::Client::session::do_write(): Initiating write (2 pending) 'Hello world'
void WS::Server::session::on_read(boost::system::error_code, size_t): Success, 11
From 127.0.0.1:20000: 'Hello world'
void WS::Client::session::do_write(): Initiating write (1 pending) 'Bye world'
void WS::Server::session::on_read(boost::system::error_code, size_t): Success, 9
From 127.0.0.1:20000: 'Bye world'
Closing down websocket
void WS::Client::session::on_read(boost::system::error_code, size_t): Operation canceled
~session()