Search code examples
boostasiobeast

boost asio strand doesn't work as expected?


I have a program based on boost::asio and beast with coroutine support.

The program connects to a remote server through Beast Websocket through which it sends requests, which are triggered by its internal logic.

So basically the requests are quite often and heavy.

I use co_spawn(strand_, sendRequest(data), asio::detached). sendRequest is a coroutine which sends data to the remote server like this

ws_->async_write(strand_, asio::buffer(data), asio::use_awaitable)

the strand_ at two places are the same one, and the ws_ is initialized with the strand as well:

class Adapter {
Adapter(asio::strand<boost::asio::io_context::executor_type>&strand)
, strand_(strand)
, ssl_context_(asio::ssl::context::tlsv12_client)
, ws_(new websocket::stream<beast::ssl_stream<beast::tcp_stream>>(strand, ssl_context_)) {}

void handleRequest(std::string& data) {
    // do some checks and have a new_data
    co_spawn(strand_, sendRequest(new_data), boost::asio::detached);
}

asio::awaitable<void> sendRequest(const std::string& data) {
  // do some conversion to have a new_data
  co_await ws_->async_write(strand_, asio::buffer(new_data), asio::use_awaitable);
  co_return;
}

protected:
    asio::strand<boost::asio::io_context::executor_type>&strand_;
    asio::ssl::context  ssl_context_;
    std::unique_ptr<websocket::stream<beast::ssl_stream<beast::tcp_stream>>> ws_;
}

handleReques could be triggered very fast though.

but I got crashed if the requests are too often. It happened at soft_mutex.hpp in the beast:

try_lock(T const*)
{
    // If this assert goes off it means you are attempting to
    // simultaneously initiate more than one of same asynchronous
    // operation, which is not allowed. For example, you must wait
    // for an async_read to complete before performing another
    // async_read.
    //
    BOOST_ASSERT(id_ != T::id);
    if(id_ != 0)
        return false;
    id_ = T::id;
    return true;
}

if I replaced co_spawn(strand_, sendRequest(data), asio::detached) with an empty coroutine like this:

co_spawn(strand_, []() -> asio::awaitable<void> {co_return;}, asio::detached)

it's fine. So it definitely happens at ws_->async_write in the sendRequest.

I tried with asio::post to wrap co_spawn, it is still not working.

I do this on Ubuntu 22.04 with Boost 1.85.

(BTW, If I use ws_->write it works)


Solution

  • You are initiating multiple writes. Even though the initiations happen on the strand, you do not have any measures in place that guard that any previous write operations have completed before.

    It's not possible to show how you could fix your code since none of it is shown. In general, the solution will involve a queue and a write loop that sends messages from the queue until the queue is empty.

    You can see many of my ASIO examples that contain std::deque<...> which I typically name outbox_ or similar.

    UPDATE

    The code in the question is incomplete. Here's a self-contained sketch of what a proper solution could look like. Note that you have to maintain the lifetime of a until all (detached) operations using it have completed.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/ssl.hpp>
    #include <deque>
    namespace beast     = boost::beast;
    namespace asio      = boost::asio;
    namespace websocket = boost::beast::websocket;
    namespace ssl       = boost::asio::ssl;
    
    struct Adapter {
        using Message = std::string;
    
        explicit Adapter(asio::any_io_executor ex) : ws_{ex, ssl_context_} {}
    
        void handleRequest(std::string msg) {
            // do some checks and have a message
            co_spawn(ws_.get_executor(), sendRequest(std::move(msg)), boost::asio::detached);
        }
    
      protected:
        asio::awaitable<void> sendRequest(std::string message) {
            queue_.push_back(std::move(message));
            if (queue_.size() == 1)
                co_await writeLoop(); // only one write loop can be active
        }
    
        asio::awaitable<void> writeLoop() {
            while (!queue_.empty()) {
                co_await ws_.async_write(asio::buffer(queue_.front()), asio::deferred);
                queue_.pop_front();
            }
        }
    
      private:
        ssl::context                                            ssl_context_{ssl::context::tlsv12_client};
        websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
        std::deque<std::string>                                 queue_;
    };
    
    int main() {
        asio::thread_pool ioc;
    
        Adapter a{make_strand(ioc)};
    
        ioc.join();
    }