Search code examples
c++c++11boostasio

Any better way to improve the code to keep the underlying memory blocks used by aiso::async_send remains valid


I once wrote the the code snippet below to asynchronously send tcp data.

void Conversation::do_write(std::string str)
{
  auto conversation_ptr(shared_from_this());
  m_socket.async_send(asio::buffer(str), [this, conversation_ptr](std::error_code error, size_t length ){
    if (!error)
    {
        std::cout << "data write successfully" << std::endl;
    }
    else
    {
        std::cout << "error in writing data: " << error.message() << std::endl;
    }
  });
}

Later, I found there is something wrong with the code above though nobody pointed it out before. As per the documentation, which says that the underlying memory blocks is retained by the caller, which must guarantee that they remain valid until the handler is called.

basic_stream_socket::async_send (1 of 2 overloads)

Start an asynchronous send.

 template<
     typename ConstBufferSequence,
     typename WriteToken = default_completion_token_t<executor_type>>
 DEDUCED async_send(
     const ConstBufferSequence & buffers,
     WriteToken && token = default_completion_token_t< executor_type >());

This function is used to asynchronously send data on the stream socket. It is an initiating function for an asynchronous operation, and always returns immediately.

Parameters

  • buffers

    One or more data buffers to be sent on the socket. Although the buffers object may be copied as necessary, ownership of the underlying memory blocks is retained by the caller, which must guarantee that they remain valid until the completion handler is called.

To keep the context alive, I rewrite the code as blow.

//Note: the passed string would be modified to avoid copying
void Conversation::do_write(std::string& str)  //modification1: using the reference
{
    auto conversation_ptr(shared_from_this());
    //modification2 using shared pointer to keep the content to be sent alive when the handler is called
    std::shared_ptr<std::string> content_ptr=std::make_shared<std::string>(std::move(str));
    m_socket.async_send(asio::buffer(*content_ptr), [this, conversation_ptr, content_ptr](std::error_code error, size_t length ){
    if (!error)
    {
        std::cout << "data write successfully" << std::endl;
    }
    else
    {
        std::cout << "error in writing data: " << error.message() << std::endl;
    }
  });
}

Is there any better way? How to imporve the code above?

Updated six hours later: It think it's better to let the user explicitly pass an instance of std::shared_ptr<Conversation> .

void Conversation::do_write(std::shared_ptr<std::string> content)
{
  auto conversation_ptr(shared_from_this());
  m_socket.async_send(asio::buffer(*content), [this, content, conversation_ptr](std::error_code error, size_t length ){
    if (!error)
    {
        std::cout << "data write successfully" << std::endl;
    }
    else
    {
        std::cout << "error in writing data: " << error.message() << std::endl;
    }
  });
}

UPDATE AGAIN: make the messages to be sent stored in the member queue.

Thanks to @sehe's help. The code snippet below is the better choice.

Note: Conversation::do_write will be used by the thread which have not called io_context::run().

//Godbolt does not have standalone asio, I do not want to use boos::asio since it's huge.
#include "boost/asio.hpp" 
#include <memory>
#include <vector>
#include <map>
#include <array>
#include <list>
#include <iostream>
#include <system_error>

using namespace boost;
using asio::ip::tcp;

class TcpServer
{
public:
  TcpServer(asio::io_context& io_context,
      const unsigned int& port);

private:
  void do_accept();

  tcp::acceptor m_acceptor;
};

class Conversation : public std::enable_shared_from_this<Conversation>
{
public:
  Conversation(tcp::socket socket);
  virtual ~Conversation();
  void do_start();
  void do_read();
  void do_write(std::string content);
private:
  tcp::socket m_socket;
  std::array<char, 1024*2> m_data;

  //UPDATE1: make the write buffers as member variables.
  std::mutex m_mutex;   // to protect m_write_queue
  std::list<std::string> m_write_queue; //messages to be sent to client
};

TcpServer::TcpServer(asio::io_context& io_context,
      const unsigned int& port):m_acceptor(io_context, tcp::endpoint(tcp::v4(), port))
{
  do_accept();
}

void TcpServer::do_accept()
{
      m_acceptor.async_accept(make_strand(m_acceptor.get_executor()),
      [this](const std::error_code& error, tcp::socket socket)
      {
            if (!error)
            {
              std::make_shared<Conversation>(std::move(socket))->do_start();
            }
            else
            {
                std::cerr << "Error accepting connection: " << error.message() << std::endl;
            }

            // Start accepting the next connection
            do_accept();
      });
}

Conversation::Conversation(tcp::socket socket) : m_socket(std::move(socket))
{ 
  std::cout << "creating conversation:" << this << std::endl;
}

void Conversation::do_read()
{
    std::cout << "do read" << std::endl;
    auto conversation_ptr(shared_from_this());
     m_socket.async_read_some(asio::buffer(m_data),
        [this, conversation_ptr](std::error_code error, size_t length) {
          if (!error)
          {
              do_write(std::string("reply:") + std::string{m_data.data(), length});
              do_read();
              std::cout << "data read: " << std::string(m_data.data(), length) << std::endl;
          }
          else
          {
              std::cout << "error in reading data: " << error.message() << std::endl;
          }
        });
}

void Conversation::do_start()
{
  do_read();
}


void Conversation::do_write(std::string content)
{
  auto conversation_ptr(shared_from_this());
  auto buffer_ptr = &content[0];
  auto buffer_size = content.size();
  std::string to_send_str;
  {
    std::lock_guard<std::mutex> lock(m_mutex);
    m_write_queue.push_back(std::move(content));
    std::cout << "queue size(): " << m_write_queue.size() << std::endl;
  }

  //UPDATE1: make the write buffers as member variables.
  //Why not use m_write_queue.back() directly? Because m_write_queue may be pushed new data into it by other threads at the same time.
  m_socket.async_send(asio::const_buffer(buffer_ptr, buffer_size), [this, conversation_ptr](std::error_code error, size_t length ){
    if (!error)
    {
      {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_write_queue.pop_front();
      }
      std::cout << "data write successfully" << std::endl;
    }
    else
    {
        std::cout << "error in writing data: " << error.message() << std::endl;
    }
  });
}

Conversation::~Conversation()
{
   std::cout << "destroying conversation:" << this << std::endl;
}

int main()
{
  asio::io_context io_context;
  TcpServer server{io_context, 12345};

  io_context.run();
}

Solution

  • What you did works. However, I'd take str by value, since you're "sinking" its data. Let the caller decide whether the side-effect of clearing the argument's value is desired.

    std::string message = "Hello\n";
    psession1->do_write(message); // copy
    psession2->do_write(std::move(message)); // move
    

    Next up, it's more customary to make the data member of the class, since the class that contains the socket already has to guarantee the correct life-time anyways.

    Your Conversation class may already inherit from std::enable_shared_from_this for the purpose of guaranteeing the lifetime, no need to duplicate the effort for any extra state

    To top it all off: in many situations you have to watch for concurrent write requests anyways, and you will have a member like std::deque<std::string> instead to keep track of pending writes.

    That way you ensure reference stability of the buffers involved, can avoid overlapping write operations and unnecessary dynamic allocation and reference-counting.

    For examples of this you can scan any of my Asio answers for such queues, just picking the first that I see: How to safely write to a socket from multiple threads?