Search code examples
c++socketsboosttcp

Asynchronous tcp server using boost's async_write results in bad file descriptor


First of all, I'm not a native English speaker, so I would probably make some grammar mistakes, sorry for that...

I'm trying to create an asynchronous TCP server using C++ and Boost. I have had success accepting clients and receiving messages from them, but I can't reply to their messages. What I want to achieve is having a method on the TCPServer class that replies to all the connected clients. I have created a method for doing so, but when I call to TCPServer::write I get a "Bad file descriptor" error on the TcpConnectionHandler::handle_write error argument.

Could you help me figuring out what I'm doing wrong?

tcp_server.h

#ifndef TCP_SERVER_
#define TCP_SERVER_

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>

namespace my_project
{
  class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler>
  {
    public:

      TcpConnectionHandler(std::string log_prefix, boost::asio::io_service& io_service, boost::function<void(std::string&)> received_message_callback);

      boost::asio::ip::tcp::socket& socket();

      void start();

      void write(const std::string& message);

    private:

      void writeImpl(const std::string& message);

      void write();

      void handle_read(const boost::system::error_code& error, size_t bytes_transferred);

      void handle_write(const boost::system::error_code& error, size_t bytes_transferred);

      boost::asio::ip::tcp::socket socket_;
      boost::asio::streambuf message_;
      std::string log_prefix_;
      boost::function<void(std::string&)> received_message_callback_;

      std::deque<std::string> outbox_;
      boost::asio::io_service& io_service_;
      boost::asio::io_service::strand strand_;
  };

  class TcpServer
  {
    public:

      TcpServer(std::string log_prefix, unsigned int port, boost::function<void(std::string&)> received_message_callback);
      ~TcpServer();

      void start();

      void write(std::string content);

    private:

      void start_accept();

      void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, const boost::system::error_code& error);

      boost::shared_ptr<TcpConnectionHandler> connection_;
      boost::asio::io_service io_service_;
      boost::asio::ip::tcp::acceptor acceptor_;
      std::string log_prefix_;
      boost::function<void(std::string&)> received_message_callback_;
      boost::condition_variable connection_cond_;
      boost::mutex connection_mutex_;
      bool client_connected_;
      boost::thread *io_thread_;                     /**< Thread to run boost.asio io_service. */
  };

} // namespace my_project

#endif // #ifndef TCP_SERVER_

tcp_server.cpp

#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"

namespace my_project
{
  // TcpConnectionHandler

  TcpConnectionHandler::TcpConnectionHandler(std::string log_prefix, boost::asio::io_service& io_service, boost::function<void(std::string&)> received_message_callback) : io_service_(io_service), strand_(io_service_), socket_(io_service), outbox_()
  {
    log_prefix_ = log_prefix;
    received_message_callback_ = received_message_callback;
  }

  boost::asio::ip::tcp::socket& TcpConnectionHandler::socket()
  {
    return socket_;
  }

  void TcpConnectionHandler::start()
  {
    async_read_until(socket_,
        message_,
        "\r\n",
        boost::bind(&TcpConnectionHandler::handle_read,
                    shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
  }

  void TcpConnectionHandler::write(const std::string& message)
  {
    strand_.post(boost::bind(&TcpConnectionHandler::writeImpl, this, message));
  }

  void TcpConnectionHandler::writeImpl(const std::string& message)
  {
    outbox_.push_back( message );
    if ( outbox_.size() > 1 ) {
        // outstanding async_write
        return;
    }

    this->write();
  }

  void TcpConnectionHandler::write()
  {
    const std::string& message = outbox_[0];
    boost::asio::async_write(
            socket_,
            boost::asio::buffer( message.c_str(), message.size() ),
            strand_.wrap(
                boost::bind(
                    &TcpConnectionHandler::handle_write,
                    this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred
                    )
                )
            );
  }

  void TcpConnectionHandler::handle_read(const boost::system::error_code& error, size_t bytes_transferred)
  {
    // Check for client disconnection
    if ((boost::asio::error::eof == error) || (boost::asio::error::connection_reset == error))
    {
      //LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
      return;
    }

    // Convert stream to string
    std::istream stream(&message_);
    std::istreambuf_iterator<char> eos;
    std::string message_str(std::istreambuf_iterator<char>(stream), eos);

    //LOG(DEBUG) << log_prefix_ << " communication object received message: " << getPrintableMessage(message_str);

    std::istringstream iss(message_str);

    std::string msg;
    std::getline(iss, msg, '\r'); // Consumes from the streambuf.

    // Discard the rest of the message from buffer
    message_.consume(message_.size());

    if (!error)
    {
      received_message_callback_(msg);
      start();
    }
    else
    {
      // TODO: Handler here the error
    }
  }

  void TcpConnectionHandler::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
  {
    outbox_.pop_front();

    if ( error ) {
        std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
        return;
    }

    if ( !outbox_.empty() ) {
        // more messages to send
        this->write();
    }
  }


  // TcpServer

  TcpServer::TcpServer(std::string log_prefix, unsigned int port, boost::function<void(std::string&)> received_message_callback) : acceptor_(io_service_, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)), client_connected_(false), io_thread_(NULL)
  {

    log_prefix_ = log_prefix;
    received_message_callback_ = received_message_callback;

    start_accept();

    // Run io_service in secondary thread
    io_thread_ = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));

  }

  TcpServer::~TcpServer()
  {
      if (io_thread_)
      {
        io_service_.stop();
        io_thread_->interrupt();
        io_thread_->join();
        delete io_thread_;
      }
  }

  void TcpServer::start()
  {
    // Wait until client is connected to our TCP server. (condition variable)
    boost::unique_lock<boost::mutex> lock(connection_mutex_);

    while (!client_connected_)
    {
      //LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish connection...";

      connection_cond_.wait(lock);
    }

    //LOG(INFO) << log_prefix_ << " client successfully connected.";
  }

  void TcpServer::write(std::string content)
  {
    connection_->write(content);
  }

  void TcpServer::start_accept()
  {
    // Create a new connection handler
    connection_.reset(new TcpConnectionHandler(log_prefix_, acceptor_.get_io_service(), received_message_callback_));

    // Asynchronous accept operation and wait for a new connection.
    acceptor_.async_accept(connection_->socket(),
        boost::bind(&TcpServer::handle_accept, this, connection_,
        boost::asio::placeholders::error));

    //LOG(DEBUG) << log_prefix_ << " communication object started asynchronous TCP/IP connection acceptance.";
  }


  void TcpServer::handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, const boost::system::error_code& error)
  {
    if (!error)
    {
      //LOG(INFO) << log_prefix_ << " client connected!";
      connection->start();
      boost::mutex::scoped_lock lock(connection_mutex_);
      client_connected_ = true;
      connection_cond_.notify_one();
      //LOG(INFO) << log_prefix_ << " client connection accepted";
    }

    start_accept();
  }
}

Thanks in advance!


Solution

  • I slightly modified the code to be compatible with Boost 1.74.0.

    Then I ran it with ASAN:

    =================================================================
    ==22695==ERROR: AddressSanitizer: heap-use-after-free on address 0x61b000000080 at pc 0x5571c3b61379 bp 0x7ffddce81980 sp 0x7ffddce81970
    READ of size 8 at 0x61b000000080 thread T0
        #0 0x5571c3b61378 in boost::asio::detail::strand_executor_service::strand_impl::~strand_i...
        #1 0x5571c3c02599 in std::_Sp_counted_ptr<boost::asio::detail::strand_executor_service::s...
        #2 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr...
        #3 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /...
        #4 0x5571c3b5ff84 in std::__shared_ptr<boost::asio::detail::strand_executor_service::stra...
        #5 0x5571c3b5ffeb in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
        #6 0x5571c3b7d8d0 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
        #7 0x5571c3c0da6c in void std::destroy_at<boost::asio::strand<boost::asio::execution::any...
        #8 0x5571c3c0b761 in void std::allocator_traits<std::allocator<boost::asio::strand<boost:...
        #9 0x5571c3c01847 in std::_Sp_counted_ptr_inplace<boost::asio::strand<boost::asio::execut...
        #10 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /us...
        #11 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() ...
        #12 0x5571c3b25690 in std::__shared_ptr<void, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr(...
        #13 0x5571c3b256f7 in std::shared_ptr<void>::~shared_ptr() /usr/include/c++/10/bits/share...
        #14 0x5571c3b25769 in boost::asio::execution::detail::any_executor_base::destroy_shared(b...
        #15 0x5571c3b24ef2 in boost::asio::execution::detail::any_executor_base::~any_executor_ba...
        #16 0x5571c3b6a0f7 in boost::asio::execution::any_executor<boost::asio::execution::contex...
        #17 0x5571c3bbf447 in my_project::TcpConnectionHandler::~TcpConnectionHandler() /home/seh...
        #18 0x5571c3bbf4e1 in void boost::checked_delete<my_project::TcpConnectionHandler>(my_pro...
        #19 0x5571c3c020a9 in boost::detail::sp_counted_impl_p<my_project::TcpConnectionHandler>:...
        #20 0x5571c3b5bf6b in boost::detail::sp_counted_base::release() /home/sehe/custom/boost_1...
        #21 0x5571c3b5c537 in boost::detail::shared_count::~shared_count() /home/sehe/custom/boos...
        #22 0x5571c3b6a324 in boost::shared_ptr<my_project::TcpConnectionHandler>::~shared_ptr() ...
        #23 0x5571c3b1d48e in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverfl...
        #24 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
        #25 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
        #26 0x5571c3b18b99 in _start (/home/sehe/Projects/stackoverflow/sotest+0x182b99)
    
    0x61b000000080 is located 0 bytes inside of 1640-byte region [0x61b000000080,0x61b0000006e8)
    freed by thread T0 here:
        #0 0x7ff1f1eb4407 in operator delete(void*, unsigned long) (/usr/lib/x86_64-linux-gnu/lib...
        #1 0x5571c3c003f4 in boost::asio::detail::strand_executor_service::~strand_executor_servi...
        #2 0x5571c3b35e4a in boost::asio::detail::service_registry::destroy(boost::asio::executio...
        #3 0x5571c3b3578b in boost::asio::detail::service_registry::destroy_services() /home/sehe...
        #4 0x5571c3b37a4f in boost::asio::execution_context::destroy() /home/sehe/custom/boost_1_...
        #5 0x5571c3b3788a in boost::asio::execution_context::~execution_context() /home/sehe/cust...
        #6 0x5571c3b54621 in boost::asio::io_context::~io_context() /home/sehe/custom/boost_1_74_...
        #7 0x5571c3b1d43b in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverflo...
        #8 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
        #9 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
    
    previously allocated by thread T0 here:
        #0 0x7ff1f1eb33a7 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.6+...
        #1 0x5571c3bc0faa in boost::asio::execution_context::service* boost::asio::detail::servic...
        #2 0x5571c3b3623a in boost::asio::detail::service_registry::do_use_service(boost::asio::e...
        #3 0x5571c3bb74f9 in boost::asio::detail::strand_executor_service& boost::asio::detail::s...
        #4 0x5571c3bab8e7 in boost::asio::detail::strand_executor_service& boost::asio::use_servi...
        #5 0x5571c3ba085f in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
        #6 0x5571c3b92744 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
        #7 0x5571c3b7d844 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
        #8 0x5571c3b18e8a in my_project::TcpConnectionHandler::TcpConnectionHandler(std::__cxx11:...
        #9 0x5571c3b1dbe4 in my_project::TcpServer::start_accept() /home/sehe/Projects/stackoverf...
        #10 0x5571c3b1c775 in my_project::TcpServer::TcpServer(std::__cxx11::basic_string<char, s...
        #11 0x5571c3b1e7b9 in main /home/sehe/Projects/stackoverflow/test.cpp:266
        #12 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
    
    SUMMARY: AddressSanitizer: heap-use-after-free /home/sehe/custom/boost_1_74_0/boost/asio/detail/impl/strand_executor_service.ipp:88 in boost::asio::detail::strand_executor_service::strand_impl::~strand_impl()
    Shadow bytes around the buggy address:
      0x0c367fff7fc0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
      0x0c367fff7fd0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
      0x0c367fff7fe0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
      0x0c367fff7ff0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
      0x0c367fff8000: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
    =>0x0c367fff8010:[fd]fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
      0x0c367fff8020: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
      0x0c367fff8030: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
      0x0c367fff8040: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
      0x0c367fff8050: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
      0x0c367fff8060: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
    Shadow byte legend (one shadow byte represents 8 application bytes):
      Addressable:           00
      Partially addressable: 01 02 03 04 05 06 07 
      Heap left redzone:       fa
      Freed heap region:       fd
      Stack left redzone:      f1
      Stack mid redzone:       f2
      Stack right redzone:     f3
      Stack after return:      f5
      Stack use after scope:   f8
      Global redzone:          f9
      Global init order:       f6
      Poisoned by user:        f7
      Container overflow:      fc
      Array cookie:            ac
      Intra object redzone:    bb
      ASan internal:           fe
      Left alloca redzone:     ca
      Right alloca redzone:    cb
      Shadow gap:              cc
    ==22695==ABORTING
    

    As you can see, ~TcpServer destructor causes the connections to use the io_service after it was destroyed. Reordering members will fix this:

    boost::shared_ptr<TcpConnectionHandler> connection_;
    boost::asio::io_service io_service_;
    boost::asio::ip::tcp::acceptor acceptor_;
    

    Should be

    boost::asio::io_service io_service_;
    boost::shared_ptr<TcpConnectionHandler> connection_;
    boost::asio::ip::tcp::acceptor acceptor_;
    

    More

    To actually listen, it seems that you would want to /wait/ for the service to exit, not stop() and interrupt() the thread?

    Note also, there is no need to dynamically allocate the thread. Why should C++ programmers minimize use of 'new'?

    TcpServer::~TcpServer() {
        if (io_thread_.joinable()) {
            //io_service_.stop();
            //io_thread_.interrupt();
            io_thread_.join();
        }
    }
    

    Lifetime issue: shared_from_this?

    boost::bind(&TcpConnectionHandler::handle_write, this,
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred)
    

    This piece fails to capture the shared pointer, which leads to TcpConnectionHandler being destructed. (When the socket is destructed, it is closed). Actually, your problem is UB because it's use-after-free but you're "lucky" to not see a crash, which leads you to see the UB in the form of an invalid handle.

    Fix them both times:

    post(executor_,
         boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
    

    And

    async_read_until(socket_, message_, "\r\n",
         boost::bind(&TcpConnectionHandler::handle_read,
                     shared_from_this(),
                     boost::asio::placeholders::error,
                     boost::asio::placeholders::bytes_transferred));
    

    Live Demo

    I tested it successfully with the following:

    #define BOOST_BIND_NO_PLACEHOLDERS
    #ifndef TCP_SERVER_
    #define TCP_SERVER_
    
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/function.hpp>
    #include <boost/thread.hpp>
    #include <iostream>
    
    namespace my_project {
    class TcpConnectionHandler
            : public boost::enable_shared_from_this<TcpConnectionHandler> {
      public:
        TcpConnectionHandler(
            std::string log_prefix, boost::asio::any_io_executor executor,
            boost::function<void(std::string&)> received_message_callback);
    
        boost::asio::ip::tcp::socket& socket();
    
        void start();
        void write(const std::string& message);
    
      private:
        void writeImpl(const std::string& message);
        void write();
        void handle_read(const boost::system::error_code& error,
                         size_t bytes_transferred);
        void handle_write(const boost::system::error_code& error,
                          size_t bytes_transferred);
    
        boost::asio::any_io_executor executor_;
        boost::asio::ip::tcp::socket socket_;
        boost::asio::streambuf message_;
        std::string log_prefix_;
        boost::function<void(std::string&)> received_message_callback_;
    
        std::deque<std::string> outbox_;
    };
    
    class TcpServer {
      public:
        TcpServer(std::string log_prefix, unsigned int port,
                  boost::function<void(std::string&)> received_message_callback);
        ~TcpServer();
    
        void start();
    
        void write(std::string content);
    
      private:
        void start_accept();
    
        void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection,
                           const boost::system::error_code& error);
    
        boost::asio::io_service io_service_;
        boost::shared_ptr<TcpConnectionHandler> connection_;
        boost::asio::ip::tcp::acceptor acceptor_;
        std::string log_prefix_;
        boost::function<void(std::string&)> received_message_callback_;
        boost::condition_variable connection_cond_;
        boost::mutex connection_mutex_;
        bool client_connected_;
        boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
    };
    
    } // namespace my_project
    
    #endif // #ifndef TCP_SERVER_
    
    //#include "tcp_server.h"
    //#include "easylogging++.h"
    //#include "utils.h"
    
    namespace my_project {
    // TcpConnectionHandler
    TcpConnectionHandler::TcpConnectionHandler(
        std::string log_prefix, boost::asio::any_io_executor executor,
        boost::function<void(std::string&)> received_message_callback)
        : executor_(make_strand(executor)),
          socket_(executor_),
          log_prefix_(log_prefix),
          received_message_callback_(received_message_callback)
    { }
    
    boost::asio::ip::tcp::socket& TcpConnectionHandler::socket() { return socket_; }
    
    void TcpConnectionHandler::start() {
        async_read_until(socket_, message_, "\r\n",
             boost::bind(&TcpConnectionHandler::handle_read,
                         shared_from_this(),
                         boost::asio::placeholders::error,
                         boost::asio::placeholders::bytes_transferred));
    }
    
    void TcpConnectionHandler::write(const std::string& message) {
        post(executor_,
             boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
    }
    
    void TcpConnectionHandler::writeImpl(const std::string& message) {
        outbox_.push_back(message);
        if (outbox_.size() > 1) {
            // outstanding async_write
            return;
        }
    
        write();
    }
    
    void TcpConnectionHandler::write() {
        const std::string& message = outbox_[0];
        boost::asio::async_write(
            socket_, boost::asio::buffer(message.c_str(), message.size()),
            boost::asio::bind_executor(
                executor_,
                boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred)));
    }
    
    void TcpConnectionHandler::handle_read(const boost::system::error_code& error,
                                           size_t /*bytes_transferred*/) {
    
        std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
        // Check for client disconnection
        if ((boost::asio::error::eof == error) ||
            (boost::asio::error::connection_reset == error)) {
            // LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
            return;
        }
    
        // Convert stream to string
        std::istream stream(&message_);
        std::istreambuf_iterator<char> eos;
        std::string message_str(std::istreambuf_iterator<char>(stream), eos);
    
        // LOG(DEBUG) << log_prefix_ << " communication object received message: "
        // << getPrintableMessage(message_str);
    
        std::istringstream iss(message_str);
    
        std::string msg;
        std::getline(iss, msg, '\r'); // Consumes from the streambuf.
    
        // Discard the rest of the message from buffer
        message_.consume(message_.size());
    
        if (!error) {
            received_message_callback_(msg);
            start();
        } else {
            // TODO: Handler here the error
        }
    }
    
    void TcpConnectionHandler::handle_write(const boost::system::error_code& error,
                                            size_t /*bytes_transferred*/) {
        outbox_.pop_front();
    
        if (error) {
            std::cerr << "could not write: "
                      << boost::system::system_error(error).what() << std::endl;
            return;
        }
    
        if (!outbox_.empty()) {
            // more messages to send
            write();
        }
    }
    
    // TcpServer
    
    TcpServer::TcpServer(
        std::string log_prefix, unsigned int port,
        boost::function<void(std::string&)> received_message_callback)
            : acceptor_(io_service_, boost::asio::ip::tcp::endpoint(
                                         boost::asio::ip::tcp::v4(), port)),
              client_connected_(false) {
    
        log_prefix_ = log_prefix;
        received_message_callback_ = received_message_callback;
    
        start_accept();
    
        // Run io_service in secondary thread
        io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
    }
    
    TcpServer::~TcpServer() {
        if (io_thread_.joinable()) {
            //io_service_.stop();
            //io_thread_.interrupt();
            io_thread_.join();
        }
    }
    
    void TcpServer::start() {
        // Wait until client is connected to our TCP server. (condition variable)
        boost::unique_lock<boost::mutex> lock(connection_mutex_);
    
        while (!client_connected_) {
            // LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish
            // connection...";
    
            connection_cond_.wait(lock);
        }
    
        // LOG(INFO) << log_prefix_ << " client successfully connected.";
    }
    
    void TcpServer::write(std::string content) { connection_->write(content); }
    
    void TcpServer::start_accept() {
        // Create a new connection handler
        connection_.reset(new TcpConnectionHandler(
            log_prefix_, acceptor_.get_executor(), received_message_callback_));
    
        // Asynchronous accept operation and wait for a new connection.
        acceptor_.async_accept(connection_->socket(),
                               boost::bind(&TcpServer::handle_accept, this,
                                           connection_,
                                           boost::asio::placeholders::error));
    
        // LOG(DEBUG) << log_prefix_ << " communication object started asynchronous
        // TCP/IP connection acceptance.";
    }
    
    void TcpServer::handle_accept(
        boost::shared_ptr<TcpConnectionHandler> connection,
        const boost::system::error_code& error) {
        std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
        if (!error) {
            // LOG(INFO) << log_prefix_ << " client connected!";
            connection->start();
            boost::mutex::scoped_lock lock(connection_mutex_);
            client_connected_ = true;
            connection_cond_.notify_one();
            // LOG(INFO) << log_prefix_ << " client connection accepted";
        }
    
        start_accept();
    }
    } // namespace my_project
    
    int main() {
        my_project::TcpServer s("demo", 6868, [](std::string& s) {
            std::cout << "Received msg: " << std::quoted(s) << "\n";
        });
    }
    

    And as a client, e.g.

    cat test.cpp | netcat -Cw 0 localhost 6868
    

    Prints the whole lot like

    Received msg: "//#define BOOST_BIND_GLOBAL_PLACEHOLDERS"
    Received msg: "#include <boost/thread.hpp>"
    Received msg: "std::string& message);"
    Received msg: "td::string> outbox_;"
    Received msg: ":shared_ptr<TcpConnectionHandler> connection_;"
    Received msg: "#include \"utils.h\""
    Received msg: "ConnectionHandler::start() {"
    Received msg: "::writeImpl(const std::string& message) {"
    Received msg: "s(),"
    Received msg: "       // LOG(ERROR) << log_prefix_ << \" TCP/IP client disconnected!\";"
    Received msg: " // Consumes from the streambuf."
    Received msg: "e: \""
    Received msg: "nt_connected_(false) {"
    Received msg: "d to our TCP server. (condition variable)"
    Received msg: "ection handler"
    Received msg: "nication object started asynchronous"
    Received msg: "ond_.notify_one();"
    

    As you can see you have to fix that you need to take bytes_received into account, but I'll leave that to you for now.

    Post Scriptum

    Oh. I noticed another thing. If you intend to only accept 1 connection, don't reset the connection_, because that means that write acts on an unconnected instance. Perhaps you wanted to keep a list of connected clients?

    Here's a simple rework to replace the single connection_ member with a list<weak_ptr<TcpConnectionHandler> > connections_;. It even has a basic garbage collection built in.

    Message are broadcast to all connected clients.

    Live On Coliru

    //#define BOOST_BIND_GLOBAL_PLACEHOLDERS
    #define BOOST_BIND_NO_PLACEHOLDERS
    #ifndef TCP_SERVER_
    #define TCP_SERVER_
    
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/function.hpp>
    #include <boost/thread.hpp>
    #include <iostream>
    
    namespace my_project {
        namespace ph = boost::asio::placeholders;
        using boost::system::error_code;
        using boost::asio::ip::tcp;
        using Executor = boost::asio::executor;
        //using Executor = boost::asio::any_io_executor; // boost 1.74.0
        using Callback = boost::function<void(std::string const&)>;
    
        class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler> {
          public:
            TcpConnectionHandler(Executor executor, Callback callback);
    
            tcp::socket& socket() { return socket_; }
    
            void start();
            void write(std::string const& message);
    
            ~TcpConnectionHandler() { std::cerr << __FUNCTION__ << "\n"; }
          private:
            void writeImpl(const std::string& message);
            void write_loop();
            void handle_read(error_code error, size_t bytes_transferred);
            void handle_write(error_code error, size_t bytes_transferred);
    
            Executor executor_;
            tcp::socket socket_;
            boost::asio::streambuf message_;
            Callback received_message_callback_;
    
            std::list<std::string> outbox_;
        };
    
        using ConnectionPtr = boost::shared_ptr<TcpConnectionHandler>;
        using ConnectionHandle = boost::weak_ptr<TcpConnectionHandler>;
    
        class TcpServer {
          public:
            TcpServer(unsigned short port, Callback callback);
            ~TcpServer();
    
            void write(std::string const& content);
    
          private:
            void start_accept();
    
            void handle_accept(ConnectionPtr connection,
                               error_code error);
    
            boost::asio::io_service io_service_;
            std::list<ConnectionHandle> connections_;
            tcp::acceptor acceptor_;
            Callback received_message_callback_;
            boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
        };
    
    } // namespace my_project
    
    #endif // #ifndef TCP_SERVER_
    
    //#include "tcp_server.h"
    //#include "easylogging++.h"
    //#include "utils.h"
    
    namespace my_project {
        // TcpConnectionHandler
        TcpConnectionHandler::TcpConnectionHandler(Executor executor, Callback callback)
            : executor_(make_strand(executor)),
              socket_(executor_),
              received_message_callback_(callback)
        { }
    
        void TcpConnectionHandler::start() {
            async_read_until(socket_, message_, "\r\n",
                 boost::bind(&TcpConnectionHandler::handle_read,
                             shared_from_this(), ph::error, ph::bytes_transferred));
        }
    
        void TcpConnectionHandler::write(const std::string& message) {
            //std::cerr << __FUNCTION__ << ": " << message.length() << "\n";
            post(executor_, boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
        }
    
        void TcpConnectionHandler::writeImpl(const std::string& message) {
            outbox_.push_back(message);
            if (outbox_.size() == 1)
                write_loop();
        }
    
        void TcpConnectionHandler::write_loop() {
            boost::asio::async_write(
                socket_, boost::asio::buffer(outbox_.front()),
                boost::asio::bind_executor(
                    executor_,
                    boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
                                ph::error, ph::bytes_transferred)));
        }
    
        void TcpConnectionHandler::handle_read(error_code error, size_t bytes_transferred) {
            auto f = buffers_begin(message_.data()), l = f + bytes_transferred;
            message_.consume(bytes_transferred);
    
            std::istringstream iss(std::string(f, l));
            for (std::string msg; std::getline(iss, msg, '\n');) {
                if (msg.back() == '\r') {
                    msg.pop_back();
                }
                received_message_callback_(msg);
            }; // Consumes from the streambuf.
    
    
            if (!error) {
                start();
            } else {
                std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
            }
        }
    
        void TcpConnectionHandler::handle_write(error_code error, size_t /*bytes_transferred*/) {
            outbox_.pop_front();
    
            if (error) {
                std::cerr << "could not write: " << error.message() << std::endl;
            } else if (!outbox_.empty()) {
                // more messages to send
                write_loop();
            }
        }
    
        // TcpServer
        TcpServer::TcpServer(unsigned short port, Callback callback)
           : acceptor_(io_service_, { {}, port }),
             received_message_callback_(callback)
        {
            start_accept();
            io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
        }
    
        TcpServer::~TcpServer() {
            if (io_thread_.joinable()) {
                //io_service_.stop();
                //io_thread_.interrupt();
                io_thread_.join();
            }
        }
    
        void TcpServer::write(std::string const& content) {
            for (auto& handle : connections_)
                if (ConnectionPtr con = handle.lock())
                    con->write(content);
        }
    
        void TcpServer::start_accept() {
            // optionally garbage-collect connection handles
            connections_.remove_if(std::mem_fn(&ConnectionHandle::expired));
    
            // Create a new connection handler
            auto connection_ = boost::make_shared<TcpConnectionHandler>(
                acceptor_.get_executor(), received_message_callback_);
    
            // Asynchronous accept operation and wait for a new connection.
            acceptor_.async_accept(connection_->socket(),
                boost::bind(&TcpServer::handle_accept, this, connection_, ph::error));
        }
    
        void TcpServer::handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, error_code error) {
            //std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
            if (!error) {
                connections_.push_back(connection);
                connection->start();
            }
    
            start_accept();
        }
    } // namespace my_project
    
    int main() {
        my_project::TcpServer server(6868, [&server](std::string const& msg) {
            std::cout << "Broadcasting msg: " << std::quoted(msg) << "\n";
            server.write(msg + "\r\n");
        });
    }
    

    With simulated clients:

    (for a in 1 2 3; do echo -e "Foo $a\nBar $a\nQux $a" | nc -Cw1 localhost 6868 | while read line; do echo "Job $a: $line"; done& done;)
    

    Prints at the server:

    Broadcasting msg: "Foo 1"
    Broadcasting msg: "Foo 2"
    Broadcasting msg: "Bar 1"
    Broadcasting msg: "Bar 2"
    Broadcasting msg: "Foo 3"
    Broadcasting msg: "Qux 1"
    Broadcasting msg: "Qux 2"
    Broadcasting msg: "Bar 3"
    Broadcasting msg: "Qux 3"
    handle_read: End of file
    ~TcpConnectionHandler
    handle_read: End of file
    ~TcpConnectionHandler
    handle_read: End of file
    ~TcpConnectionHandler
    

    And the clients print (depending on timings):

    Job 1: Foo 1
    Job 2: Bar 1
    Job 1: Bar 1
    Job 2: Foo 2
    Job 1: Foo 2
    Job 3: Foo 3
    Job 2: Qux 1
    Job 1: Qux 1
    Job 3: Bar 2
    Job 2: Foo 3
    Job 1: Foo 3
    Job 3: Bar 3
    Job 1: Bar 2
    Job 2: Bar 2
    Job 3: Qux 2
    Job 1: Bar 3
    Job 2: Bar 3
    Job 3: Qux 3
    Job 1: Qux 2
    Job 2: Qux 2
    Job 1: Qux 3
    Job 2: Qux 3