Search code examples
c++multithreadingboost-asioasio

How to ensure that the messages will be enqueued in chronological order on multithreaded Asio io_service?


Following Michael Caisse's cppcon talk I created a connection handler MyUserConnection which has a sendMessage method. sendMessage method adds a message to the queue similarly to the send() in the cppcon talk. My sendMessage method is called from multiple threads outside of the connection handler in high intervals. The messages must be enqueued chronologically.

When I run my code with only one Asio io_service::run call (aka one io_service thread) it async_write's and empties my queue as expected (FIFO), however, the problem occurs when there are, for example, 4 io_service::run calls, then the queue is not filled or the send calls are not called chronologically.

class MyUserConnection : public std::enable_shared_from_this<MyUserConnection> {
public:
  MyUserConnection(asio::io_service& io_service, SslSocket socket) :
      service_(io_service),
      socket_(std::move(socket)),
      strand_(io_service) {
  }

  void sendMessage(std::string msg) {
    auto self(shared_from_this());
    service_.post(strand_.wrap([self, msg]() {
      self->queueMessage(msg);
    }));
  }
  
private:
  void queueMessage(const std::string& msg) {
    bool writeInProgress = !sendPacketQueue_.empty();
    sendPacketQueue_.push_back(msg);
    if (!writeInProgress) {
      startPacketSend();
    }
  }

  void startPacketSend() {
    auto self(shared_from_this());
    asio::async_write(socket_,
                      asio::buffer(sendPacketQueue_.front().data(), sendPacketQueue_.front().length()),
                      strand_.wrap([self](const std::error_code& ec, std::size_t /*n*/) {
                        self->packetSendDone(ec);
                      }));
  }

  void packetSendDone(const std::error_code& ec) {
    if (!ec) {
      sendPacketQueue_.pop_front();
      if (!sendPacketQueue_.empty()) { startPacketSend(); }
    } else {
      // end(); // My end call 
    }
  }
  
  asio::io_service& service_;
  SslSocket socket_;
  asio::io_service::strand strand_;
  std::deque<std::string> sendPacketQueue_;
};

I'm quite sure that I misinterpreted the strand and io_service::post when running the connection handler on multithreaded io_service. I'm also quite sure that the messages are not enqueued chronologically instead of messages not being async_write chronologically. How to ensure that the messages will be enqueued in chronological order in sendMessage call on multithreaded io_service?


Solution

  • If you use a strand, the order is guaranteed to be the order in which you post the operations to the strand.

    Of course, if there is some kind of "correct ordering" between threads that post then you have to synchronize the posting between them, that's your application domain.

    Here's a modernized, simplified take on your MyUserConnection class with a self-contained server test program:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <deque>
    #include <iostream>
    #include <mutex>
    
    namespace asio = boost::asio;
    namespace ssl  = asio::ssl;
    using asio::ip::tcp;
    using boost::system::error_code;
    using SslSocket = ssl::stream<tcp::socket>;
    
    class MyUserConnection : public std::enable_shared_from_this<MyUserConnection> {
      public:
        MyUserConnection(SslSocket&& socket) : socket_(std::move(socket)) {}
    
        void start() {
            std::cerr << "Handshake initiated" << std::endl;
            socket_.async_handshake(ssl::stream_base::handshake_type::server,
                                    [self = shared_from_this()](error_code ec) {
                                        std::cerr << "Handshake complete" << std::endl;
                                    });
        }
    
        void sendMessage(std::string msg) {
            post(socket_.get_executor(),
                 [self = shared_from_this(), msg = std::move(msg)]() {
                     self->queueMessage(msg);
                 });
        }
    
      private:
        void queueMessage(std::string msg) {
            outbox_.push_back(std::move(msg));
            if (outbox_.size() == 1)
                sendLoop();
        }
    
        void sendLoop() {
            std::cerr << "Sendloop " << outbox_.size() << std::endl;
            if (outbox_.empty())
                return;
    
            asio::async_write( //
                socket_, asio::buffer(outbox_.front()),
                [this, self = shared_from_this()](error_code ec, std::size_t) {
                    if (!ec) {
                        outbox_.pop_front();
                        sendLoop();
                    } else {
                        end();
                    }
                });
        }
    
        void end() {}
    
        SslSocket                socket_;
        std::deque<std::string>  outbox_;
    };
    
    int main() {
        asio::thread_pool ioc;
        ssl::context      ctx(ssl::context::sslv23_server);
        ctx.set_password_callback([](auto...) { return "test"; });
        ctx.use_certificate_file("server.pem", ssl::context::file_format::pem);
        ctx.use_private_key_file("server.pem", ssl::context::file_format::pem);
        ctx.use_tmp_dh_file("dh2048.pem");
    
        tcp::acceptor a(ioc, {{}, 8989u});
    
        for (;;) {
            auto s = a.accept(make_strand(ioc.get_executor()));
            std::cerr << "accepted " << s.remote_endpoint() << std::endl;
            auto sess = make_shared<MyUserConnection>(SslSocket(std::move(s), ctx));
            sess->start();
            for(int i = 0; i<30; ++i) {
                post(ioc, [sess, i] {
                    std::string msg = "message #" + std::to_string(i) + "\n";
                    {
                        static std::mutex mx;
                        // Lock so console output is guaranteed in the same order
                        // as the sendMessage call
                        std::lock_guard lk(mx);
                        std::cout << "Sending " << msg << std::flush;
                        sess->sendMessage(std::move(msg));
                    }
                });
            }
    
            break; // for online demo
        }
    
        ioc.join();
    }
    

    If you run it a few times, you will see that

    • the order in which the threads post is not deterministic (that's up to the kernel scheduling)
    • the order in which messages are sent (and received) is exactly the order in which they are posted.

    See live demo runs on my machine:

    enter image description here