Search code examples
c++multithreadingboostboost-asioboost-beast

reading and processing packets in parallel using boost asio


I've got a thread that listen to socket, read incoming packets, process them and send them to another socket.

I'd like to accelerate this flow, by adding dedicated thread for packet processing without changing the ordering of the packets.

I've defined a thread that run the io_context of all socket activities :

std::thread th([&] {
  log_info("reading packets from sockets thread");

  boost::asio::steady_timer timer(read_ioc_);
  timer.expires_after(std::chrono::hours(1));
  timer.async_wait(
      []([[maybe_unused]] const boost::system::error_code& error) {
        log("got timeout");
      });

  read_ioc_.run();
  log_info("thread stopped");
});

std::thread th2([&] {
  log_info("processing packets");

  boost::asio::steady_timer timer(process_ioc_);
  timer.expires_after(std::chrono::hours(1));
  timer.async_wait(
      []([[maybe_unused]] const boost::system::error_code& error) {
        log("got timeout");
      });

  process_ioc_.run();
  log_info("processing stopped");
});

th.detach();
th2.detach();

in the class that handle both packet reading and packet processing. I'll get the 2 io context objects, and create a socket and a strand out of those. the socket will be used to read received packets continuously or waiting for new packet to reach the socket. the ioc_process will be used to process the packets in order (let's say it's encrypting each packet and add additional header like some tunnel protocols)

myClass::myClass(boost::asio::io_context &ioc_process, boost::asio::io_context &ioc_process
 ) : socket_(read_ioc), strand_(process_ioc) ... 

myClass::startReadPacketLoop() {
...
   readPacketLoop();
...
}

This is the loop where I enqueue the packet I just read, and go for the next read.

My question is whether I can maintain the packet ordering but have multiple threads to handle the packets simultaneously. can I implement it using shared queue or replace the strand with ioc_process? how will I preserve ordering in this case (let's say I write the packets to another socket and I want them to be in order)

void myClass::readPacketLoop(std::shared_ptr<Packet> pkt,
                   const boost::system::error_code& error,
                   size_t bytes_transferred) {

  std::shared_ptr<Packet> pkt = freeList.alloc();

  socket_.async_receive(boost::asio::buffer(pkt->buffer(), pkt->buffer_size()),
                        std::bind(&myClass::readPacketLoop);

  boost::asio::post(strand_, [=] {
    process_packet(pkt, error, bytes_transferred);
  });



Solution

  • I'll walk through the issues I see in the order I encountered them reading your question

    Timers

    It looks as if your threads exist for the sole reason to time 1 hour. The whole purpose of Asio is to allow asynchronous operations (such as timers) without the need for threads. Also, since you will have other asynchronous work pending, the timer's completions will have no effect at all.

    Assuming you really meant to stop processing after a timeout, simplify:

    struct myClass {
        myClass(asio::io_context& read_ioc, asio::thread_pool& process_ioc)
            : socket_(read_ioc)
            , strand_(process_ioc.get_executor()) {
        }
    
        void startReadPacketLoop() {
            log_info("reading packets from sockets thread");
            //...
            if (!ec)
                startReadPacketLoop();
            //...
        }
      private:
        tcp::socket                                    socket_;
        asio::strand<asio::thread_pool::executor_type> strand_;
    };
    
    int main() {
        asio::thread_pool process_ioc_(10);
        asio::io_context  read_ioc_;
    
        myClass mc(read_ioc_, process_ioc_);
        mc.startReadPacketLoop();
    
        read_ioc_.run_for(1h);
    }
    

    Strands?

    Firstly, as you probably understand the code

      boost::asio::post(strand_, [=] {
        process_packet(pkt, error, bytes_transferred);
      });
    

    guarantees that all process_packet will be executed single-threaded - the strand executor makes sure that no two tasks execute concurrently. That is exactly not what you want.

    Q. My question is whether I can maintain the packet ordering but have multiple threads to handle the packets simultaneously.

    Yes. Just keep in mind the order in which processing results become available may not be input order.

    Q. can I implement it using shared queue

    Yes. This is the typical application of a FIFO queue. Note that an execution context is a kind of task queue, so you're right to think that maybe you don't need anything non-Asio.

    Q. or replace the strand with ioc_process? how will I preserve ordering in this case (let's say I write the packets to another socket and I want them to be in order)

    If you logically want to write each packet to the output after processing, you can do exactly that (in synchronous pseudo code, for readability):

     while (true) {
         packet request = receive_packet();
         future<packet> response = process(request); // on the thread pool
         post(io_strand, [f=std::move(response)] {
              send_packet(f.get()); // may block until response ready
         });
     }
    

    Note that here you unconditionally post the output operations in sequential order to a strand. Of course, you may have to wait for the next output packet to become ready.

    This is known as HOL Blocking

    A big point of concern is whenever (potentially) blocking operations (like future::wait/get) are performed on an execution service it may lead to deadlocks.

    Side Note: Executors

    You may want to switch your design to passing executors rather than references to specific types of execution contexts or strands thereof. It decouples your code better. I will show this approach in the demo.

    Naive Demo

    Let's create a very simple implementation that

    • reads "packets" as lines of text
    • processes them by bit-wise xor-ing all octets (excluding the newlines) and putting the output as an integer number on its own line
    • simulate varying processing times, depending on the calculated checksum. This makes sure that processing of packets does not complete in the order they arrive
    • for demo purposes
      • we reduce 1 hour runtime to 10s
      • output will be to the same socket.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iomanip>
    #include <iostream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::literals;
    using namespace std::placeholders;
    
    namespace /*anon*/ {
        static std::mutex mx;
        auto              now   = std::chrono::steady_clock::now;
        static auto const start = now();
    
        static inline void log_info(auto const&... args) {
            static std::atomic_uint     tid_gen{};
            thread_local unsigned const tid = tid_gen++;
    
            std::lock_guard lk(mx); // avoid interleaved console output
            std::cout << std::setw(10) << (now() - start) / 1ms << "ms tid=" << tid << " ";
            (std::cout << ... << args) << std::endl;
        }
    
        static void check(error_code const& ec, [[maybe_unused]] std::string_view where) {
            log_info(where, ": ", ec.message());
            if (ec.failed() && ec != asio::error::eof)
                throw boost::system::system_error(ec);
        }
    } // namespace
    
    struct myClass {
        myClass(asio::any_io_executor ex) : socket_(ex) {}
    
        void startReadPacketLoop(uint16_t port) {
            socket_.connect({{}, port});
    
            log_info("reading packets from sockets thread");
            post(socket_.get_executor(), [this] { do_receive(); });
        }
    
      private:
        std::string incoming_;
    
        void do_receive() { // continue receiving packets
            async_read_until(socket_, asio::dynamic_buffer(incoming_), "\n",
                             std::bind(&myClass::on_received, this, _1, _2));
        }
    
        void on_received(error_code ec, size_t len) {
            check(ec, "on_received");
    
            if (ec == asio::error::eof)
                return;
    
            // post processing task on free pool
            std::packaged_task<std::string()> task{std::bind(process_packet, incoming_.substr(0, len))};
            std::future<std::string> fut = post(pool_, std::move(task));
    
            // but post output on the strand
            post(output_strand_, [this, fut = std::move(fut)]() mutable {
                // note fut.get() may block
                auto response = std::make_shared<std::string>(fut.get() + "\n");
    
                async_write(socket_, asio::buffer(*response), [response](error_code ec, size_t /*n*/) {
                    check(ec, "async_write");
                    // log_info("written (", ec.message(), ") ", n, " of ", response->length(), " tranferred");
                });
            });
    
            // consume packet from incoming
            incoming_.erase(0, len);
    
            // continue receiving packets
            do_receive();
        };
    
        static std::string process_packet(std::string request) {
            request.resize(1 + request.find_last_not_of("\r\n"));
    
            unsigned cksum = 0;
            for (uint8_t ch : request)
                cksum ^= ch;
    
            std::this_thread::sleep_for(1ms * cksum);
    
            log_info("slept for ", cksum, "ms after processing ", quoted(request));
            return request + ": " + std::to_string(cksum);
        }
    
        tcp::socket                                    socket_;
        asio::thread_pool                              pool_{10};
        asio::strand<asio::thread_pool::executor_type> output_strand_{pool_.get_executor()};
    };
    
    int main() try {
        log_info("start");
        asio::io_context ioc;
    
        myClass mc(ioc.get_executor());
        mc.startReadPacketLoop(8989);
    
        ioc.run_for(10s);
        log_info("done");
    } catch (boost::system::system_error const& se) {
        log_info("error: ", se.code().message());
    }
    

    With a simple server

    printf "qux\nbar\nfoo\n" | nc -t -l -p 8989 -w1&
    

    Prints

             0ms tid=0 start
             0ms tid=0 reading packets from sockets thread
             0ms tid=0 on_received: Success
             0ms tid=0 on_received: Success
             0ms tid=0 on_received: Success
             0ms tid=0 on_received: End of file
           103ms tid=0 slept for 102ms after processing "foo"
           114ms tid=1 slept for 113ms after processing "bar"
           124ms tid=2 slept for 124ms after processing "qux"
    qux: 124
    bar: 113
           125ms tid=0 async_write: Success
    foo: 102
           125ms tid=0 async_write: Success
           125ms tid=0 async_write: Success
         10002ms tid=0 done
    

    Note that the output order matches input order, even though processing completes in inverse order due to the delays.

    Trouble In Paradise

    However, there are two problems.

    1 Starvation

    If input arrives too fast, output stops¹:

    enter image description here

    As stated before, future::get() blocks. However, that's not the real issue, since the strand ensures only one of the pool's threads will ever block on a future at a time. What's happening is starvation.

    Of course we can "fix" it by making the pool_ arbitrarily large. E.g. with 1'000 threads, it does complete:

    asio::thread_pool pool_{1'000};
    

    enter image description here

    Even so, it is clear that the output_strand_ is sometimes starved for CPU time. To really alleviate the situation, instead we can stop competing for pool_ threads at all, replacing the output_strand_ with a single thread:

    tcp::socket       socket_;
    asio::thread_pool pool_{10}, output_{1};
    

    Indeed, now we don't see starvation anymore.

    Thread Safety

    Since output is done on a different thread than other accesses to socket_ we have a data race. To fix it, we will need to add indirection:

    // but post output from the strand
    post(output_, [this, fut = std::move(fut)]() mutable {
        // note fut.get() may block
        auto response = std::make_shared<std::string>(fut.get() + "\n");
    
        post(socket_.get_executor(), [=] {
            async_write(                          //
                socket_, asio::buffer(*response), //
                [response](error_code ec, [[maybe_unused]] size_t n) {
                    check(ec, "async_write");
                    // log_info("written (", ec.message(), ") ", n, " of ", response->length(), " tranferred");
                });
        });
    });
    

    Note how the blocking wait is (importantly) not on the IO thread, but the async_write is.

    With this in place, you may have to add a work-guard, because if the input end closes the file sooner than the processing is ready, there would not be any outstanding work keeping the ioc alive.

    post(output_, [this, fut = std::move(fut), work = make_work_guard(socket_.get_executor())]() mutable {
    

    Fixed Demo

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iomanip>
    #include <iostream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::literals;
    using namespace std::placeholders;
    
    namespace /*anon*/ {
        static std::mutex mx;
        auto              now   = std::chrono::steady_clock::now;
        static auto const start = now();
    
        static inline void log_info(auto const&... args) {
            static std::atomic_uint     tid_gen{};
            thread_local unsigned const tid = tid_gen++;
    
            std::lock_guard lk(mx); // avoid interleaved console output
            std::cout << std::setw(10) << (now() - start) / 1ms << "ms tid=" << tid << " ";
            (std::cout << ... << args) << std::endl;
        }
    
        static void check(error_code const& ec, [[maybe_unused]] std::string_view where) {
            // log_info(where, ": ", ec.message());
            if (ec.failed() && ec != asio::error::eof)
                throw boost::system::system_error(ec);
        }
    } // namespace
    
    struct myClass {
        myClass(asio::any_io_executor ex) : socket_(ex) {}
    
        void startReadPacketLoop(uint16_t port) {
            socket_.connect({{}, port});
    
            log_info("reading packets from sockets thread");
            post(socket_.get_executor(), [this] { do_receive(); });
        }
    
      private:
        std::string incoming_;
    
        void do_receive() { // continue receiving packets
            async_read_until(socket_, asio::dynamic_buffer(incoming_), "\n",
                             std::bind(&myClass::on_received, this, _1, _2));
        }
    
        void on_received(error_code ec, size_t len) {
            check(ec, "on_received");
    
            if (ec == asio::error::eof)
                return;
    
            // post processing task on free pool
            std::packaged_task<std::string()> task{std::bind(process_packet, incoming_.substr(0, len))};
            std::future<std::string> fut = post(pool_, std::move(task));
    
            // but post output from the strand
            post(output_, [this, fut = std::move(fut), work = make_work_guard(socket_.get_executor())]() mutable {
                // note fut.get() may block
                auto response = std::make_shared<std::string>(fut.get() + "\n");
    
                post(socket_.get_executor(), [this, response] {
                    async_write(                          //
                        socket_, asio::buffer(*response), //
                        [response](error_code ec, [[maybe_unused]] size_t n) {
                            check(ec, "async_write");
                            // log_info("written (", ec.message(), ") ", n, " of ", response->length(), " tranferred");
                        });
                });
            });
    
            // consume packet from incoming
            incoming_.erase(0, len);
    
            // continue receiving packets
            do_receive();
        };
    
        static std::string process_packet(std::string request) {
            request.resize(1 + request.find_last_not_of("\r\n"));
    
            unsigned cksum = 0;
            for (uint8_t ch : request)
                cksum ^= ch;
    
            std::this_thread::sleep_for(1ms * cksum);
    
            log_info("slept for ", cksum, "ms after processing ", quoted(request));
            return request + ": " + std::to_string(cksum);
        }
    
        tcp::socket       socket_;
        asio::thread_pool pool_{10}, output_{1};
    };
    
    int main() try {
        log_info("start");
        asio::io_context ioc;
    
        myClass mc(ioc.get_executor());
        mc.startReadPacketLoop(8989);
    
        ioc.run_for(1h);
        log_info("done");
    } catch (boost::system::system_error const& se) {
        log_info("error: ", se.code().message());
    }
    

    Summary/Conclusion

    This approach isn't optimal. It allocates unncessarily, uses blocking operations and so on.

    You could optimize by using a ringbuffer/disruptor style container for the output queue. You'll need to use a monotonously increasing id for processed packets. Bonus, if you make sure the container has suitable reference stability, you can avoid dynamically allocating the shared responses.


    ¹ input.txt generated with sort -R /etc/dictionaries-common/words | grep -v \' > input.txt