Search code examples
c++socketsboostrosasio

How to use a single Boost::asio::io_context for multiple sockets and serial ports


I'm using Boost::Asio to create multiple UDP sockets and serial ports, and I use a single boost::asio::io_context that is shared among all of them (given in the constructor). All devices are configured to run with async reads and writes. Then, I only call io_context.run(); to let it run forever. It works very well most of the time.

However, at some point, for example when a sudden traffic load reaches one socket, my process suddenly jumps to 100% on the CPU and stays at 100% forever, with the global communication dying. When this happens, using perf, I see that the process is stuck 99.99% of the time at the same place, with a stack looking like:

main
asio::detail::scheduler::run
asio::detail::epoll_reactor::descriptor_state::do_complete
asio::descriptor_read_op<asio::mutable_buffers_1, std::_Bind<void
my_serial_port::on_async_data_received <- this is my receiving function for serial ports
...

So it seems that it is stuck processing only one serial port in loop, but nothing else anymore, as if a same event was processed endlessly, while there is still a lot of other data coming in the system.

  • Is there something I'm doing wrong by sharing the io_context ?
  • Is there a way to debug such issues with events with Boost::Asio ?

I have seen a similar hang, but where the stack only shows a function called by a timer event instead of the serial port (i.e. a timer sending a statistics packet at 1 Hz, but taking 100% of the CPU and blocking everything else).

Context: On an embedded system using ROS and Linux, I'm running a process (ROS node) that acts as a communication router. It has 7 inputs/outputs: 2 serial ports (3 Mb/s), 2 network UDP sockets and 3 local UDP sockets (UNIX domain). It also listens to some ROS topics coming from other processes. Packets can be received on all ports and a custom protocol is used to decode the packets, read their destination and send them out further on the given port. Some packets are also generated in the process and sent out on some ports, based on data subscribed through ROS. To keep things simple, to avoid concurrency and because I only have one core available, I try to run this process on a single main thread. To merge ROS and Boost::Asio together in a single thread, I'm using librosasio to forward events from ROS to the asio::io_context.

Thanks !


Solution

  • The issue was due to the handling of the output buffer of the serial port. As Asio is extremely low-level, the output buffer needs to be handled completely.

    • I had one output buffer where bytes were added at the end each time something had to be sent.
    • Then after adding to the output buffer, the whole buffer was sent using the asynchronous function.
    • When the async function returned, the bytes that were effectively sent were removed from the start of the buffer (but count could be different than what was asked to be sent).

    2 issues:

    1. If the send function was called again before the previous async operation finished, the same whole buffer was sent again (plus the last bytes added). This lead to the load increasing abruptly until reaching 100%.
    2. Using a std::vector as buffer was a bad idea, because adding bytes to it can cause the data to be reallocated, and asio does not keep the std::vector, but only a pointer to its data, which can be invalidated.

    Now I could change that to:

    1. Have a buffer where I put bytes to send, and another for the pending async write.
    2. Only effectively send data if there is not other pending async write operation on-going.
    3. When sending data, copy the output buffer to a "pending output buffer" so that the async operation has its own buffer that is not altered, and clear the output buffer directly after that.
    4. Use asio::async_write() function to ensure that all bytes are sent before the async operation finishes, instead of relying on the poor function that can return with only half of the data effectively sent.

    It is now working, but it's not optimal. But all in all, one week of work and debug to make a simple serial port work with asio. The same took me 20 minutes using QSerialPort.

    The current code looks like that. Header:

    class SerialPort
    {
      public:
        static constexpr std::size_t INPUT_BUFFER_SIZE = 65536;
        static constexpr std::size_t OUTPUT_BUFFER_SIZE = 65536;
        
        SerialPort(asio::io_context& io_context, const std::string& port_name,
                   uint32_t baudrate, std::size_t input_buffer_size = INPUT_BUFFER_SIZE,
                   std::size_t output_buffer_size = OUTPUT_BUFFER_SIZE);
        SerialPort(const SerialPort&) = delete;
        SerialPort(SerialPort&&) = delete;
        SerialPort& operator=(const SerialPort&) = delete;
        SerialPort& operator=(SerialPort&&) = delete;
    
        bool sendData(const uint8_t* buffer, size_t size);
    
        void setDataReceivedCallback(
                const DataReceivedCallback& data_received_callback);
    
      protected:
        void waitForNextData();
        void onAsyncDataReceived(const asio::error_code& err, std::size_t received_size);
        void sendAsyncData();
        void onAsyncDataSent(const asio::error_code& err, std::size_t sent_size);
    
        void onDataReceived(std::size_t received_size);
      private:
        asio::serial_port port_;
        std::vector<uint8_t> input_buffer_;
        std::vector<uint8_t> output_buffer_;
        std::size_t output_buffer_position_ = 0;
        std::vector<uint8_t> output_buffer_pending_;
        DataReceivedCallback data_received_callback_;
        bool async_write_pending_ = false;
    };
    

    Source:

    #include "serial_port.hpp"
    #include <linux/serial.h>
    
    SerialPort::SerialPort(asio::io_context& io_context, const std::string& port_name,
                           uint32_t baudrate, std::size_t input_buffer_size,
                           std::size_t output_buffer_size)
        : port_(io_context),
          input_buffer_(input_buffer_size),
          output_buffer_(output_buffer_size),
          output_buffer_pending_(output_buffer_size)
    {
        asio::error_code err;
        port_.open(port_name, err);
        if (!err)
        {
            port_.set_option(asio::serial_port_base::baud_rate(baudrate));
            port_.set_option(asio::serial_port_base::character_size(8));
            port_.set_option(asio::serial_port_base::stop_bits(
                    asio::serial_port_base::stop_bits::one));
            port_.set_option(
                    asio::serial_port_base::parity(asio::serial_port_base::parity::none));
            port_.set_option(asio::serial_port_base::flow_control(
                    asio::serial_port_base::flow_control::none));
            // set low latency flag on the serial port
            int fd = port_.native_handle();
            struct serial_struct ser_info;
            ioctl(fd, TIOCGSERIAL, &ser_info);
            ser_info.flags |= ASYNC_LOW_LATENCY;
            ioctl(fd, TIOCSSERIAL, &ser_info);
            waitForNextData();
        }
    }
    bool SerialPort::sendData(const uint8_t* buffer, size_t size)
    {
        // check for overflows
        if (output_buffer_position_ + size <= output_buffer_.size())
        {
            std::memcpy(&output_buffer_[output_buffer_position_], buffer, size);
            output_buffer_position_ += size;
            sendAsyncData();
            return true;
        }
        else
        {
            ++output_overflows_count_;
            return false;
        }
    }
    void SerialPort::setDataReceivedCallback(
            const DataReceivedCallback& data_received_callback)
    {
        data_received_callback_ = data_received_callback;
    }
    void SerialPort::waitForNextData()
    {
        port_.async_read_some(asio::buffer(input_buffer_),
                              [this](const asio::error_code& err, std::size_t received_size) {
                                  this->onAsyncDataReceived(err, received_size);
                              });
    }
    void SerialPort::onAsyncDataReceived(const asio::error_code& err,
                                         std::size_t received_size)
    {
        if (received_size > 0)
        {
            onDataReceived(received_size);
        }
        waitForNextData();
    }
    void SerialPort::sendAsyncData()
    {
        if (!async_write_pending_)
        {
            // copy to pending buffer
            std::memcpy(output_buffer_pending_.data(), output_buffer_.data(),
                        output_buffer_position_);
            asio::async_write(
                    port_,
                    asio::buffer(output_buffer_pending_.data(), output_buffer_position_),
                    [this](const asio::error_code& err, std::size_t sent_size) {
                        this->onAsyncDataSent(err, sent_size);
                    });
            output_buffer_position_ = 0;
            async_write_pending_ = true;
        }
    }
    void SerialPort::onAsyncDataSent(const asio::error_code& err, std::size_t)
    {
        async_write_pending_ = false;
        if (output_buffer_position_ > 0)
        {
            sendAsyncData();
        }
    }
    void SerialPort::onDataReceived(std::size_t received_size)
    {
        if (data_received_callback_)
        {
            data_received_callback_(input_buffer_.data(), received_size);
        }
    }