Search code examples
asynchronouswebsocketboostclient-serverboost-asio

How do I call async_write multiple times and send data to the server? (boost::asio)


This is the template I'm using: I got it from github here: https://github.com/boostorg/beast/blob/develop/example/websocket/client/async/websocket_client_async.cpp

class session : public std::enable_shared_from_this<session>
{
    tcp::resolver resolver_;
    websocket::stream<beast::tcp_stream> ws_;
    beast::flat_buffer buffer_;
    std::string host_;
    std::string text_;

public:
    // Resolver and socket require an io_context
    explicit session(net::io_context& ioc)
        : resolver_(net::make_strand(ioc))
        , ws_(net::make_strand(ioc))
    {}

    // Start the asynchronous operation
    void run( char const* host, char const* port, char const* text)
    {
        // Save these for later
        host_ = host;
        text_ = text;

        // Look up the domain name
        resolver_.async_resolve( host, port,
            beast::bind_front_handler( &session::on_resolve, shared_from_this()));
    }

    void on_resolve( beast::error_code ec, tcp::resolver::results_type results)
    {
        if(ec)
            return fail(ec, "resolve");

        // Set the timeout for the operation
        beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

        // Make the connection on the IP address we get from a lookup
        beast::get_lowest_layer(ws_).async_connect(
            results, beast::bind_front_handler(
                &session::on_connect, shared_from_this()));
    }

    void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
    {
        if(ec)
            return fail(ec, "connect");

        beast::get_lowest_layer(ws_).expires_never();

        // Set suggested timeout settings for the websocket
        ws_.set_option( websocket::stream_base::timeout::suggested(
                beast::role_type::client));

        // Set a decorator to change the User-Agent of the handshake
        ws_.set_option(websocket::stream_base::decorator(
            [](websocket::request_type& req)
            {
                req.set(http::field::user_agent,
                    std::string(BOOST_BEAST_VERSION_STRING) +
                        " websocket-client-async");
            }));

        host_ += ':' + std::to_string(ep.port());

        // Perform the websocket handshake
        ws_.async_handshake(host_, "/",
            beast::bind_front_handler(
                &session::on_handshake,
                shared_from_this()));
    }

    void on_handshake(beast::error_code ec)
    {
        if(ec)
            return fail(ec, "handshake");
        
        // Send the message
        ws_.async_write(
            net::buffer(text_),
            beast::bind_front_handler(
                &session::on_write,
                shared_from_this()));
    }

    void on_write( beast::error_code ec, std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "write");
    }
    void do_send(std::string message) {
 
    }

    void on_sent(beast::error_code ec, std::size_t bytes_transferred) {

        }
    }
};

//-----------------------------------------------------------------

startClient()
{
    net::io_context ioc;

    // Launch the asynchronous operation
    std::make_shared<session>(ioc)->run("host", "port", "text");
    ioc.run(); // *** This was my problem!!! ***
}

onButtonClickedWriteDataToServer()
{
// I want to call do_send() here
}

I'm able to get and show the data when I get to the async_write in the on_handshake() function. If I call do_send("myText") in the on_handshake() function, it also works only once.

However when I call it multiple times, it doesn't work. And when I call do_send("myText") in writeDataToServer(), it doesn't work.

I just want to know why I can't do async_write like this and how to send data to the server from onButtonClickedWriteDataToServer() multiple times.

Update: Accepted answerer solved my issue. It was ioc.run() causing the block. I did not have to change my do_send() on_send() function (I just realized those were made specifically to work with our server so I keep those or I need to change the server code. Removed the irrelevant part.


Solution

  • the documentation says:

    The algorithm, known as a composed asynchronous operation, is implemented in terms of calls to the next layer's async_write_some function. The program must ensure that no other calls to write, write_some, async_write, or async_write_some are performed until this operation completes.

    The begin with, all members of session (except constructor and run) should be private. What you really need is a public member function send that you can call from external.

    This means you have to post the implementation on the strand:

    void send(std::string message) {
        post(ws_.get_executor(), [self = shared_from_this(), message = std::move(message)]() mutable {
            self->do_send(std::move(message));
        });
    }
    

    Now, all of the implementation should be private again:

      private: // everything here on the strand
        std::deque<std::string> outgoing_;
    
        void do_send(std::string message) {
            outgoing_.push_back(std::move(message));
            if (outgoing_.size() == 1)
                do_write_loop();
        }
    
        void do_write_loop() {
            if (outgoing_.empty())
                return;
            ws_.async_write(net::buffer(outgoing_.front()),
                            [this, self = shared_from_this()](beast::error_code ec, size_t) {
                                if (ec)
                                    return fail(ec, "on_sent");
    
                                outgoing_.pop_front();
                                do_write_loop();
                            });
        }
    

    As you can see, I've omitted the need for on_sent by using a lambda. This also ensure that we don't forget to remove all other uses of async write operations

    Live On Coliru

    To help with session life time we start a do_read_loop as well (even though we don't really expect server messages? we can just ignore them). Note how main uses a weak-pointer to track the session lifetime.

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <deque>
    #include <iostream>
    #include <thread>
    
    namespace beast     = boost::beast;
    namespace http      = beast::http;
    namespace websocket = beast::websocket;
    namespace net       = boost::asio;
    using tcp           = net::ip::tcp;
    
    //------------------------------------------------------------------------------
    
    // Report a failure
    #define TRACE() std::cout << __FUNCTION__ << ":" << __LINE__ << std::endl
    void fail(beast::error_code ec, char const* what) { std::cerr << what << ": " << ec.message() << "\n"; }
    
    // Sends a WebSocket message and prints the response
    class session : public std::enable_shared_from_this<session> {
      public:
        // Resolver and socket require an io_context
        explicit session(net::any_io_executor ex) : resolver_(ex), ws_(resolver_.get_executor()) {}
    
        // Start the asynchronous operation
        void run(char const* host, char const* port) {
            // Save these for later
            host_ = host;
    
            // Look up the domain name
            resolver_.async_resolve(host, port,
                                    beast::bind_front_handler(&session::on_resolve, shared_from_this()));
        }
    
        void send(std::string message) {
            std::cout << "Sending: " << quoted(message) << std::endl;
            post(ws_.get_executor(), [self = shared_from_this(), message = std::move(message)]() mutable {
                self->do_send(std::move(message));
            });
        }
    
      private: // everything here on the strand
        tcp::resolver                        resolver_;
        websocket::stream<beast::tcp_stream> ws_;
        std::string                          host_;
        beast::flat_buffer                   incoming_;
        std::deque<std::string>              outgoing_;
    
        void do_send(std::string message) {
            TRACE();
            outgoing_.push_back(std::move(message));
            if (outgoing_.size() == 1)
                do_write_loop();
        }
    
        void do_write_loop() {
            if (outgoing_.empty())
                return;
            ws_.async_write(net::buffer(outgoing_.front()),
                            [this, self = shared_from_this()](beast::error_code ec, size_t) {
                            TRACE();
                                if (ec)
                                    return fail(ec, "on_sent");
    
                                outgoing_.pop_front();
                                do_write_loop();
                            });
        }
    
        void do_read_loop() {
            TRACE();
            // only to detect disconnections, we don't actually expect server messages
            ws_.async_read(incoming_, [this, self = shared_from_this()](beast::error_code ec, size_t) {
                if (ec) {
                    ws_.next_layer().cancel(); // cancel all pending operations, including writes
                    return fail(ec, "read");
                }
    
                std::cout << "Ignoring incoming message: " << quoted(beast::buffers_to_string(incoming_.cdata()))
                          << std::endl;
                incoming_.clear();
                do_read_loop();
            });
        }
    
        void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
            TRACE();
            if (ec)
                return fail(ec, "resolve");
    
            // Set the timeout for the operation
            beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
    
            // Make the connection on the IP address we get from a lookup
            beast::get_lowest_layer(ws_).async_connect(
                results, beast::bind_front_handler(&session::on_connect, shared_from_this()));
        }
    
        void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) {
            TRACE();
            if (ec)
                return fail(ec, "connect");
    
            // Turn off the timeout on the tcp_stream, because
            // the websocket stream has its own timeout system.
            beast::get_lowest_layer(ws_).expires_never();
    
            // Set suggested timeout settings for the websocket
            ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
    
            // Set a decorator to change the User-Agent of the handshake
            ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                req.set(http::field::user_agent,
                        std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
            }));
    
            // Update the host_ string. This will provide the value of the
            // Host HTTP header during the WebSocket handshake.
            // See https://tools.ietf.org/html/rfc7230#section-5.4
            host_ += ':' + std::to_string(ep.port());
    
            // Perform the websocket handshake
            ws_.async_handshake(host_, "/",
                                beast::bind_front_handler(&session::on_handshake, shared_from_this()));
        }
    
        void on_handshake(beast::error_code ec) {
            TRACE();
            if (ec)
                return fail(ec, "handshake");
            do_read_loop(); // only to detect disconnections
        }
    };
    
    //------------------------------------------------------------------------------
    using namespace std::chrono_literals;
    
    int main(int argc, char** argv) {
        if (argc != 3) {
            std::cerr << "Usage: " << argv[0] << " <host> <port>\n"
                      << "Example:\n"
                      << "    " << argv[0] << " echo.websocket.org 80\n";
            return EXIT_FAILURE;
        }
        auto const host = argv[1];
        auto const port = argv[2];
    
        // The io_context is required for all I/O
        net::thread_pool ioc(1);
    
        // Launch the asynchronous operation
        std::weak_ptr handle = [&] {
            auto s = std::make_shared<session>(make_strand(ioc));
            s->run(host, port);
            return s;
        }();
    
        for (int i = 0;!handle.expired(); ++i) {
            if (auto s = handle.lock())
                s->send("onButtonClickedWriteDataToServer # " + std::to_string(i));
            std::this_thread::sleep_for(500ms);
        }
    
        ioc.join();
    }
    

    With a local demo: