Search code examples
c++tcpboost-asio

How to batch send unsent messages in asio


There is an example in asio, which caches the sent messages in a deque. I think when there are too many unsent messages in this deque, such as 1000, I want to process it through constbuffersequence, that is, batch sending, so the following How should the code be changed, thank you!

   void deliver(const chat_message& msg)
   {
     bool write_in_progress = !write_msgs_.empty();
     write_msgs_.push_back(msg);
     if (!write_in_progress)
     {   
       boost::asio::async_write(socket_,
           boost::asio::buffer(write_msgs_.front().data(),
             write_msgs_.front().length()),
           boost::bind(&chat_session::handle_write, shared_from_this(),
             boost::asio::placeholders::error));
     }   
   }
 
   void handle_write(const boost::system::error_code& error)
   {
     if (!error)
     {   
       write_msgs_.pop_front();
       if (!write_msgs_.empty())
       {   
         boost::asio::async_write(socket_,
             boost::asio::buffer(write_msgs_.front().data(),
               write_msgs_.front().length()),
             boost::bind(&chat_session::handle_write, shared_from_this(),
               boost::asio::placeholders::error));
       }   
     }   
     else
     {   
       room_.leave(shared_from_this());
     }   
   }

Solution

  • You can transform the deque to any container modeling the const buffer sequence concept:

    std::vector<asio::const_buffer> buffers;
    std::transform(
        begin(write_msgs_), end(write_msgs_), back_inserter(buffers),
        [](Message const& s) { return asio::buffer(s); });
    
    async_write( //
        socket_, buffers,
        [this, self = shared_from_this()] //
        (error_code ec, std::size_t bytes_written) {
    
              // ...
              write_msgs_.clear();
        });
    

    The transform is a force of habit here, you might prefer

    std::vector<asio::const_buffer> buffers;
    for (auto& s: write_msgs_)
        buffers.push_back(asio::buffer(s));
    

    Live Demo

    Modified from this recent example How to safely write to a socket from multiple threads?:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    
    namespace asio = boost::asio;
    using boost::system::error_code;
    using asio::ip::tcp;
    using Message = std::string;
    
    class chat_session : public std::enable_shared_from_this<chat_session> {
      public:
        chat_session(tcp::socket socket) : socket_(std::move(socket)) {}
    
        void start() { do_read(); }
    
        void deliver_many(std::vector<Message> msgs) {
            post(socket_.get_executor(),
                 [this, msgs = std::move(msgs), self = shared_from_this()] //
                 () mutable {
                     for (auto& msg : msgs) {
                         do_write(std::move(msg));
                     }
                 });
        }
    
        void deliver(Message msg) {
            post(socket_.get_executor(),
                 [this, msg = std::move(msg), self = shared_from_this()] //
                 () mutable { do_write(std::move(msg)); });
        }
    
      private:
        void do_read() {
            async_read_until(
                socket_, asio::dynamic_buffer(incoming_), '\0',
                [this, self = shared_from_this()] //
                (error_code ec, std::size_t length) {
                    if (!ec) {
                        process_message(incoming_.substr(0, length - 1));
                        incoming_.erase(0, length);
    
                        do_read();
                    } else if (ec != asio::error::eof) {
                        std::cerr << "Read error: " << ec.message() << std::endl;
                    }
                });
        }
    
        void do_write(Message message)
        {
            write_msgs_.push_back(std::move(message)); // assumed on (implicit) strand
            if (write_msgs_.size() == 1) {
                write_loop();
            }
        }
    
        void write_loop() {
            std::cerr << "write_loop with write_msgs_.size() = " << write_msgs_.size() << std::endl;
            if (write_msgs_.empty())
                return;
    
            if (write_msgs_.size() > 100) {
                std::vector<asio::const_buffer> buffers;
                std::transform(
                    begin(write_msgs_), end(write_msgs_), back_inserter(buffers),
                    [](Message const& s) { return asio::buffer(s); });
    
                async_write( //
                    socket_, buffers,
                    [this, self = shared_from_this()] //
                    (error_code ec, std::size_t /*length*/) {
                        if (!ec) {
                            write_msgs_.clear();
                            write_loop();
                        } else if (ec != asio::error::eof) {
                            std::cerr << "Write error: " << ec.message() << std::endl;
                        }
                    });
            } else {
                async_write( //
                    socket_, asio::buffer(write_msgs_.front()),
                    [this, self = shared_from_this()] //
                    (error_code ec, std::size_t /*length*/) {
                        if (!ec) {
                            write_msgs_.pop_front();
                            write_loop();
                        } else if (ec != asio::error::eof) {
                            std::cerr << "Write error: " << ec.message() << std::endl;
                        }
                    });
            }
        }
    
        void process_message(Message const& message) {
            std::vector<Message> responses;
            for (int i = 0; i < 200; ++i) {
                responses.push_back("Response #" + std::to_string(i) + " for " +
                                    message + "\n");
            }
    
            // dispatch/post to executor because we might be on a different thread (not in this example)
            // (not in this example)
            post(socket_.get_executor(),
                 std::bind(&chat_session::deliver_many, shared_from_this(),
                           std::move(responses)));
        }
    
        tcp::socket         socket_;
        Message             incoming_;
        std::deque<Message> write_msgs_;
    };
    
    class server {
      public:
        server(asio::any_io_executor ex, unsigned short port)
            : acceptor_(ex, tcp::endpoint(tcp::v4(), port))
        {
            do_accept();
        }
    
      private:
        void do_accept()
        {
            acceptor_.async_accept(
                make_strand(acceptor_.get_executor()),
                [this](error_code ec, tcp::socket&& s) {
                    if (!ec) {
                        std::cout << "Accepted " << s.remote_endpoint() << std::endl;
                        std::make_shared<chat_session>(std::move(s))->start();
                    }
    
                    do_accept();
                });
        }
    
        tcp::acceptor acceptor_;
    };
    
    int main() {
        asio::thread_pool ctx;
        server s(ctx.get_executor(), 8989);
        ctx.join();
     }
    

    When sending a single message from a client:

    g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp 
    ./a.out&
    sleep .5; printf 'HelloWorld\0' | nc 127.0.0.1 8989 -w1
    

    shows e.g.:

    Accepted 127.0.0.1:39538
    write_loop with write_msgs_.size() = 1
    Response #0 for HelloWorld
    write_loop with write_msgs_.size() = 199
    Response #1 for HelloWorld
    Response #2 for HelloWorld
    Response #3 for HelloWorld
    Response #4 for HelloWorld
    Response #5 for HelloWorld
    Response #6 for HelloWorld
    Response #7 for HelloWorld
    Response #8 for HelloWorld
    Response #9 for HelloWorld
    Response #10 for HelloWorld
    Response #11 for HelloWorld
    Response #12 for HelloWorld
    Response #13 for HelloWorld
    Response #14 for HelloWorld
    Response #15 for HelloWorld
    Response #16 for HelloWorld
    Response #17 for HelloWorld
    Response #18 for HelloWorld
    Response #19 for HelloWorld
    Response #20 for HelloWorld
    Response #21 for HelloWorld
    Response #22 for HelloWorld
    Response #23 for HelloWorld
    Response #24 for HelloWorld
    Response #25 for HelloWorld
    Response #26 for HelloWorld
    Response #27 for HelloWorld
    Response #28 for HelloWorld
    Response #29 for HelloWorld
    Response #30 for HelloWorld
    Response #31 for HelloWorld
    Response #32 for HelloWorld
    Response #33 for HelloWorld
    Response #34 for HelloWorld
    Response #35 for HelloWorld
    Response #36 for HelloWorld
    Response #37 for HelloWorld
    Response #38 for HelloWorld
    Response #39 for HelloWorld
    Response #40 for HelloWorld
    Response #41 for HelloWorld
    Response #42 for HelloWorld
    Response #43 for HelloWorld
    Response #44 for HelloWorld
    Response #45 for HelloWorld
    Response #46 for HelloWorld
    Response #47 for HelloWorld
    Response #48 for HelloWorld
    Response #49 for HelloWorld
    Response #50 for HelloWorld
    Response #51 for HelloWorld
    Response #52 for HelloWorld
    Response #53 for HelloWorld
    Response #54 for HelloWorld
    Response #55 for HelloWorld
    Response #56 for HelloWorld
    Response #57 for HelloWorld
    Response #58 for HelloWorld
    Response #59 for HelloWorld
    Response #60 for HelloWorld
    Response #61 for HelloWorld
    Response #62 for HelloWorld
    Response #63 for HelloWorld
    Response #64 for HelloWorld
    Response #65 for HelloWorld
    Response #66 for HelloWorld
    Response #67 for HelloWorld
    Response #68 for HelloWorld
    Response #69 for HelloWorld
    Response #70 for HelloWorld
    Response #71 for HelloWorld
    Response #72 for HelloWorld
    Response #73 for HelloWorld
    Response #74 for HelloWorld
    Response #75 for HelloWorld
    Response #76 for HelloWorld
    Response #77 for HelloWorld
    Response #78 for HelloWorld
    Response #79 for HelloWorld
    Response #80 for HelloWorld
    Response #81 for HelloWorld
    Response #82 for HelloWorld
    Response #83 for HelloWorld
    Response #84 for HelloWorld
    Response #85 for HelloWorld
    Response #86 for HelloWorld
    Response #87 for HelloWorld
    Response #88 for HelloWorld
    Response #89 for HelloWorld
    Response #90 for HelloWorld
    Response #91 for HelloWorld
    Response #92 for HelloWorld
    Response #93 for HelloWorld
    Response #94 for HelloWorld
    Response #95 for HelloWorld
    Response #96 for HelloWorld
    Response #97 for HelloWorld
    Response #98 for HelloWorld
    Response #99 for HelloWorld
    Response #100 for HelloWorld
    Response #101 for HelloWorld
    Response #102 for HelloWorld
    Response #103 for HelloWorld
    Response #104 for HelloWorld
    Response #105 for HelloWorld
    Response #106 for HelloWorld
    Response #107 for HelloWorld
    Response #108 for HelloWorld
    Response #109 for HelloWorld
    Response #110 for HelloWorld
    Response #111 for HelloWorld
    Response #112 for HelloWorld
    Response #113 for HelloWorld
    Response #114 for HelloWorld
    Response #115 for HelloWorld
    Response #116 for HelloWorld
    Response #117 for HelloWorld
    Response #118 for HelloWorld
    Response #119 for HelloWorld
    Response #120 for HelloWorld
    Response #121 for HelloWorld
    Response #122 for HelloWorld
    Response #123 for HelloWorld
    Response #124 for HelloWorld
    Response #125 for HelloWorld
    Response #126 for HelloWorld
    Response #127 for HelloWorld
    Response #128 for HelloWorld
    Response #129 for HelloWorld
    Response #130 for HelloWorld
    Response #131 for HelloWorld
    Response #132 for HelloWorld
    Response #133 for HelloWorld
    Response #134 for HelloWorld
    Response #135 for HelloWorld
    Response #136 for HelloWorld
    Response #137 for HelloWorld
    Response #138 for HelloWorld
    Response #139 for HelloWorld
    Response #140 for HelloWorld
    Response #141 for HelloWorld
    Response #142 for HelloWorld
    Response #143 for HelloWorld
    Response #144 for HelloWorld
    Response #145 for HelloWorld
    Response #146 for HelloWorld
    Response #147 for HelloWorld
    Response #148 for HelloWorld
    Response #149 for HelloWorld
    Response #150 for HelloWorld
    Response #151 for HelloWorld
    Response #152 for HelloWorld
    Response #153 for HelloWorld
    Response #154 for HelloWorld
    Response #155 for HelloWorld
    Response #156 for HelloWorld
    Response #157 for HelloWorld
    Response #158 for HelloWorld
    Response #159 for HelloWorld
    Response #160 for HelloWorld
    Response #161 for HelloWorld
    Response #162 for HelloWorld
    Response #163 for HelloWorld
    Response #164 for HelloWorld
    Response #165 for HelloWorld
    Response #166 for HelloWorld
    Response #167 for HelloWorld
    Response #168 for HelloWorld
    Response #169 for HelloWorld
    Response #170 for HelloWorld
    Response #171 for HelloWorld
    Response #172 for HelloWorld
    Response #173 for HelloWorld
    Response #174 for HelloWorld
    Response #175 for HelloWorld
    Response #176 for HelloWorld
    Response #177 for HelloWorld
    Response #178 for HelloWorld
    Response #179 for HelloWorld
    Response #180 for HelloWorld
    Response #181 for HelloWorld
    Response #182 for HelloWorld
    Response #183 for HelloWorld
    Response #184 for HelloWorld
    Response #185 for HelloWorld
    Response #186 for HelloWorld
    Response #187 for HelloWorld
    Response #188 for HelloWorld
    Response #189 for HelloWorld
    Response #190 for HelloWorld
    Response #191 for HelloWorld
    Response #192 for HelloWorld
    Response #193 for HelloWorld
    Response #194 for HelloWorld
    Response #195 for HelloWorld
    Response #196 for HelloWorld
    Response #197 for HelloWorld
    Response #198 for HelloWorld
    Response #199 for HelloWorld
    write_loop with write_msgs_.size() = 0