This is the template I'm using: I got it from github here: https://github.com/boostorg/beast/blob/develop/example/websocket/client/async/websocket_client_async.cpp
class session : public std::enable_shared_from_this<session>
{
tcp::resolver resolver_;
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::string host_;
std::string text_;
public:
// Resolver and socket require an io_context
explicit session(net::io_context& ioc)
: resolver_(net::make_strand(ioc))
, ws_(net::make_strand(ioc))
{}
// Start the asynchronous operation
void run( char const* host, char const* port, char const* text)
{
// Save these for later
host_ = host;
text_ = text;
// Look up the domain name
resolver_.async_resolve( host, port,
beast::bind_front_handler( &session::on_resolve, shared_from_this()));
}
void on_resolve( beast::error_code ec, tcp::resolver::results_type results)
{
if(ec)
return fail(ec, "resolve");
// Set the timeout for the operation
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results, beast::bind_front_handler(
&session::on_connect, shared_from_this()));
}
void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
{
if(ec)
return fail(ec, "connect");
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option( websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-async");
}));
host_ += ':' + std::to_string(ep.port());
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
beast::bind_front_handler(
&session::on_handshake,
shared_from_this()));
}
void on_handshake(beast::error_code ec)
{
if(ec)
return fail(ec, "handshake");
// Send the message
ws_.async_write(
net::buffer(text_),
beast::bind_front_handler(
&session::on_write,
shared_from_this()));
}
void on_write( beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "write");
}
void do_send(std::string message) {
}
void on_sent(beast::error_code ec, std::size_t bytes_transferred) {
}
}
};
//-----------------------------------------------------------------
startClient()
{
net::io_context ioc;
// Launch the asynchronous operation
std::make_shared<session>(ioc)->run("host", "port", "text");
ioc.run(); // *** This was my problem!!! ***
}
onButtonClickedWriteDataToServer()
{
// I want to call do_send() here
}
I'm able to get and show the data when I get to the async_write in the on_handshake() function. If I call do_send("myText") in the on_handshake() function, it also works only once.
However when I call it multiple times, it doesn't work. And when I call do_send("myText") in writeDataToServer(), it doesn't work.
I just want to know why I can't do async_write like this and how to send data to the server from onButtonClickedWriteDataToServer() multiple times.
Update: Accepted answerer solved my issue. It was ioc.run() causing the block. I did not have to change my do_send() on_send() function (I just realized those were made specifically to work with our server so I keep those or I need to change the server code. Removed the irrelevant part.
the documentation says:
The algorithm, known as a composed asynchronous operation, is implemented in terms of calls to the next layer's async_write_some function. The program must ensure that no other calls to write, write_some, async_write, or async_write_some are performed until this operation completes.
The begin with, all members of session (except constructor and run
) should be private. What you really need is a public member function send
that you can call from external.
This means you have to post the implementation on the strand:
void send(std::string message) {
post(ws_.get_executor(), [self = shared_from_this(), message = std::move(message)]() mutable {
self->do_send(std::move(message));
});
}
Now, all of the implementation should be private again:
private: // everything here on the strand
std::deque<std::string> outgoing_;
void do_send(std::string message) {
outgoing_.push_back(std::move(message));
if (outgoing_.size() == 1)
do_write_loop();
}
void do_write_loop() {
if (outgoing_.empty())
return;
ws_.async_write(net::buffer(outgoing_.front()),
[this, self = shared_from_this()](beast::error_code ec, size_t) {
if (ec)
return fail(ec, "on_sent");
outgoing_.pop_front();
do_write_loop();
});
}
As you can see, I've omitted the need for on_sent
by using a lambda. This also ensure that we don't forget to remove all other uses of async write operations
To help with session life time we start a do_read_loop
as well (even though we don't really expect server messages? we can just ignore them). Note how main uses a weak-pointer to track the session lifetime.
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <deque>
#include <iostream>
#include <thread>
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
namespace net = boost::asio;
using tcp = net::ip::tcp;
//------------------------------------------------------------------------------
// Report a failure
#define TRACE() std::cout << __FUNCTION__ << ":" << __LINE__ << std::endl
void fail(beast::error_code ec, char const* what) { std::cerr << what << ": " << ec.message() << "\n"; }
// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session> {
public:
// Resolver and socket require an io_context
explicit session(net::any_io_executor ex) : resolver_(ex), ws_(resolver_.get_executor()) {}
// Start the asynchronous operation
void run(char const* host, char const* port) {
// Save these for later
host_ = host;
// Look up the domain name
resolver_.async_resolve(host, port,
beast::bind_front_handler(&session::on_resolve, shared_from_this()));
}
void send(std::string message) {
std::cout << "Sending: " << quoted(message) << std::endl;
post(ws_.get_executor(), [self = shared_from_this(), message = std::move(message)]() mutable {
self->do_send(std::move(message));
});
}
private: // everything here on the strand
tcp::resolver resolver_;
websocket::stream<beast::tcp_stream> ws_;
std::string host_;
beast::flat_buffer incoming_;
std::deque<std::string> outgoing_;
void do_send(std::string message) {
TRACE();
outgoing_.push_back(std::move(message));
if (outgoing_.size() == 1)
do_write_loop();
}
void do_write_loop() {
if (outgoing_.empty())
return;
ws_.async_write(net::buffer(outgoing_.front()),
[this, self = shared_from_this()](beast::error_code ec, size_t) {
TRACE();
if (ec)
return fail(ec, "on_sent");
outgoing_.pop_front();
do_write_loop();
});
}
void do_read_loop() {
TRACE();
// only to detect disconnections, we don't actually expect server messages
ws_.async_read(incoming_, [this, self = shared_from_this()](beast::error_code ec, size_t) {
if (ec) {
ws_.next_layer().cancel(); // cancel all pending operations, including writes
return fail(ec, "read");
}
std::cout << "Ignoring incoming message: " << quoted(beast::buffers_to_string(incoming_.cdata()))
<< std::endl;
incoming_.clear();
do_read_loop();
});
}
void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
TRACE();
if (ec)
return fail(ec, "resolve");
// Set the timeout for the operation
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results, beast::bind_front_handler(&session::on_connect, shared_from_this()));
}
void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) {
TRACE();
if (ec)
return fail(ec, "connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
}));
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
host_ += ':' + std::to_string(ep.port());
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
beast::bind_front_handler(&session::on_handshake, shared_from_this()));
}
void on_handshake(beast::error_code ec) {
TRACE();
if (ec)
return fail(ec, "handshake");
do_read_loop(); // only to detect disconnections
}
};
//------------------------------------------------------------------------------
using namespace std::chrono_literals;
int main(int argc, char** argv) {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <host> <port>\n"
<< "Example:\n"
<< " " << argv[0] << " echo.websocket.org 80\n";
return EXIT_FAILURE;
}
auto const host = argv[1];
auto const port = argv[2];
// The io_context is required for all I/O
net::thread_pool ioc(1);
// Launch the asynchronous operation
std::weak_ptr handle = [&] {
auto s = std::make_shared<session>(make_strand(ioc));
s->run(host, port);
return s;
}();
for (int i = 0;!handle.expired(); ++i) {
if (auto s = handle.lock())
s->send("onButtonClickedWriteDataToServer # " + std::to_string(i));
std::this_thread::sleep_for(500ms);
}
ioc.join();
}
With a local demo: