Search code examples
c++multithreadingasynchronousboostasio

Separate Boost async read and async write in two threads


I was looking at the documentation provided by Boost regarding how to realize TCP async client/server (echo server, chat, and so on), but I didn't find anything related to managing the async read and async write using two different threads.

In my scenario, I have a client that gets async messages from the server and sends these to a method that manages and produces data that will be sent to a server, but the "send message" phase to the server is not dependent from the operation before, but it happens at a periodic time (it could happen, for example, that the client sends to the server old data because the server didn't send updated data).

I need a suggestion about this problem. My idea was to split the async read and async write operations into operations managed by two different threads (in this way, the sending phase can be executed at a fixed time) and maybe a third one that manages the phase of manipulating data before sending to server (of course, I'll need to synchronize the exchange of information between the thread that produces information and the method that manages the information in order to produce data that will be sent to the server).


Solution

  • Threading is not required for this at all:

    struct Session : std::enable_shared_from_this<Session> {
        Session(tcp::socket socket) : socket(std::move(socket)) {}
    
        void start() {
            asio::post(socket.get_executor(), [this, self = shared_from_this()] { do_read_loop(); });
        }
    
        void write(std::string msg) {
            asio::post(socket.get_executor(), [this, msg = std::move(msg)]() mutable {
                write_msgs.push_back(std::move(msg));
                if (write_msgs.size() == 1) {
                    do_write_loop();
                }
            });
        }
    
      private:
        // all privates are on the strand
        void do_read_loop() {
            auto self(shared_from_this());
            socket.async_read_some( //
                asio::buffer(data), [this, self](boost::system::error_code ec, std::size_t length) {
                    std::cout << "Received (" << ec.message() << ") " << quoted(std::string(data.data(), length))
                              << std::endl;
                    if (!ec) {
                        do_read_loop();
                    }
                });
        }
    
        void do_write_loop() {
            if (write_msgs.empty())
                return;
    
            asio::async_write(socket, asio::buffer(write_msgs.front()),
                [this, self=shared_from_this()](boost::system::error_code ec, std::size_t /*length*/) {
                    if (!ec) {
                        write_msgs.pop_front();
                        do_write_loop();
                    }
                });
        }
    
        tcp::socket socket;
        std::array<char, 1000> data;
        std::deque<std::string> write_msgs;
    };
    

    Now, no matter whether you run this on a single thread or e.g. using asio::threadpool with a strand for the socket executor, everything will work correctly.