Search code examples
phpboosttcpboost-asioipv4

Why boost::asio::async_write works well at the first time and something goes wrong for the second time?


I can successfully send out data via TCP by invoking async_write at first, whereas it goes wrong when the async_write is called again. Here is the code snippet:

#include <algorithm>
#include <array>
#include <boost/asio.hpp>
#include <boost/range.hpp>
#include <chrono>
#include <iostream>
#include <random>
#include <vector>

const std::size_t buf_size = 500*1024;
const int test_cycles = 1.024e5;

namespace asio = boost::asio;

int main()
{
  std::vector<char> send_data(buf_size);

  std::vector<char> recv_buf(buf_size);

  asio::io_service ios;

  asio::ip::tcp::socket socket1(ios);
  asio::ip::tcp::socket socket2(ios);
  asio::ip::tcp::acceptor acceptor(ios, {asio::ip::tcp::v4(), 55557});
  socket1.connect({asio::ip::address_v4::loopback(), 55557});
  acceptor.accept(socket2);

  for (std::size_t i = 0; i < 1; ++i)
  {
      auto start = std::chrono::steady_clock::now();
      for(int j=0; j < test_cycles; ++j)
      {
            size_t written_bytes = 0;
            auto to_send_data = send_data;
            asio::async_write(socket1,
                asio::dynamic_buffer(send_data),
                [&](auto ec, auto n)
                {
                    if(!ec)
                    {
                        std::cout << "successfully sent " << n << std::endl;
                    }
                    else
                    {
                        std::cout << ec.message() << std::endl;
                    }

                    if(0==n)
                    {
                        std::cout << "send error" << std::endl;
                    }

                    written_bytes = n;
                });

            asio::async_read(socket2, asio::buffer(recv_buf),
                [&](auto ec, auto n)
                {
                    if(!ec)
                    {
                        //std::cout << "received " << n << std::endl;
                    }
                    else
                    {
                        std::cout << ec.message() << std::endl;
                    }

                    if(0==n)
                    {
                        std::cout << "received error" << std::endl;
                    }

                    if(written_bytes != n)
                    {
                        std::cout << "received is not same with the sent" << std::endl;
                    }
                });
                
                ios.run();
                ios.reset();
        }
        
        auto end = std::chrono::steady_clock::now();
        std::chrono::duration<float> elapsed = end - start;
        std::cout << elapsed.count() << " seconds\n";
        std::cout << (buf_size * test_cycles / elapsed.count() / 1024 / 1024/ 1024) << " GB/s\n";
  }
}

Here is the output:

successfully sent 512000
successfully sent 0
send error

Some hint

I found a workaround method and the program goes well.Here is the related code snippet:

        auto to_send_data = send_data;
        asio::async_write(socket1,
            asio::dynamic_buffer(to_send_data),

Why the aforementioned code snippet goes wrong?

UPDATE:

  1. I try to set the breakpoint in the implementation of STD:: vector:: resize() via VsCode IDE(I did this test on Ubuntu. ), but the breakpoint does not work indeed(i.e the breakpoint is grey.). I could guarantee that binary program is built as debug mode. I also try to set the breakpoint by GDB, but GDB outputs "Function "std::vector::resize" not defined."

  2. I set breakpoints in the implementation of the aforementioned operator(), I found the default has never been triggered indeed, in other words, start is always 1.


Solution

  • Unfortunately boost documentation on dynamic_buffer is extremely laconic:

    A dynamic buffer encapsulates memory storage that may be automatically resized as required.

    meaning that dynamic_buffer will manipulate the underlying vector during IO operations.

    Boost ASIO tutorial is more explicit:

    Dynamic buffer is a concept. Dynamic buffer is a buffer that you can write data into or read from it. If the buffer isn't big enough to fit your data then it will resize (grow) dynamically. So when you write into the dynamic buffer you don't have to worry if there is enough space left in the buffer. On the other hand, when you read data from a dynamic buffer, you are responsible to throw away (consume) bytes read and no longer needed so the buffer won't grow permanently.

    Write operation on dynamic buffer in boost calls consume method of dynamic_buffer that erases bytes_tranferred from it:

    class write_dynbuf_v2_op
       ....
       void operator()(const boost::system::error_code& ec,
           std::size_t bytes_transferred, int start = 0)
       {
         switch (start)
        {
          case 1:
             // write operation
             async_write(stream_, buffers_.data(0, buffers_.size()),
                   BOOST_ASIO_MOVE_CAST(CompletionCondition)(completion_condition_),
                   BOOST_ASIO_MOVE_CAST(write_dynbuf_v2_op)(*this));
            return;
         default:
            // write completed, consume transferred bytes and call handler
            buffers_.consume(bytes_transferred);
            handler_(ec, static_cast<const std::size_t&>(bytes_transferred));
      }
    }
    

    Hence in the original example, we would queue an async write on the vector, then queue async operation on the same vector again and again. After the first callback finishes, transferring 512000 bytes, the second callback has an empty vector (as it was erased by dynamic_buffer) and the callback prints

    Successfully sent 0

    As the error code is 0 and then

    Send error

    As send error is printed when the number of bytes transferred is 0.

    io_context.run() is stuck as it has a pending read operation.

    The workaround helps as we schedule a copy of the vector for each async_write operation. An alternative would be resizing the vector back to the original size after io_context.run()