Search code examples
c++multithreadingboostboost-asiounique-ptr

boost::asio::io_service::post - how to move std::unique_ptr into a handler function


I'm trying to use boost::asio::io_service::post with std::unique_ptr in lambda-capture of the handler. But the post handler needs to be copy-constructed for later execution. Thus, as far as I understand the msgPtr value of type std::unique_ptr in the lambda-capture as well needs to be copied, even though I'm trying to move-construct it in the lambda-capture. But the copy constructor in the unique_ptr is deleted, which makes complete sense. Is there a way to move the std::unique_ptr to be executed later in the handler of the io_service::post?

The error I get when I try to compile it with GCC:

   error: use of deleted function ...
   BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check;

The code sample goes below (updated):

#include <boost/asio.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <deque>
#include <memory>
#include <string>

// Defined abstract class IMsg and concrete MsgTest class, representing the
// messages to be sent
class IMsg {
 public:
  explicit IMsg() = default;
  virtual ~IMsg() = default;
  IMsg(const IMsg& msg) = delete;
  IMsg& operator=(const IMsg& msg) = delete;
  virtual const std::string& Serialize() = 0;
};

class MsgTest : public IMsg {
 public:
  explicit MsgTest(const std::string& msg) : m_testMsg(msg) {}
  MsgTest(const MsgTest&) = delete;
  MsgTest& operator=(const MsgTest&) = delete;
  const std::string& Serialize() override { return m_testMsg; }

 private:
  std::string m_testMsg;
};

using IMsgPtr = std::unique_ptr<IMsg>;

class MsgClient {
 public:
  MsgClient(boost::asio::io_service& ioService)
      : m_ioService(ioService), m_socket(ioService) {}

  void asyncSendMsg(IMsgPtr&& msgPtr) {
    m_ioService.post([this, msgPtr{std::move(msgPtr)}]() mutable {
      m_messagesOut.emplace_back(std::move(msgPtr));
      // Serialize() returns const reference to serialized member data.
      // Thus, the data will be valid until the async_write returns.
      const std::string& currentMsg = m_messagesOut.front()->Serialize();

      // todo: such usage of async_write is dangerous and not thread-safe!
      boost::asio::async_write(
          m_socket, boost::asio::buffer(currentMsg, currentMsg.length()),
          [this](boost::system::error_code ec, std::size_t length) {
            if (ec == 0) {
              m_messagesOut.pop_front();
            } else {
              // todo: error
            }
          });
    });
  }

 private:
  std::deque<IMsgPtr> m_messagesOut;
  boost::asio::io_service& m_ioService;
  boost::asio::ip::tcp::socket m_socket;
};

// Calling the asynSendMsg
int main() {
  boost::asio::io_service ioServiceWorker;
  MsgClient client(ioServiceWorker);
  //todo: client.connect()...
  client.asyncSendMsg(std::make_unique<MsgTest>("Hello World"));
  // code that runs io_service in a separate thread is present below
  // but omitted.
}

P.S. We are limited to using C++14 only and Boost.Asio version 1.57. Also, if anyone is interested in why I'm using post and call async_write in the handler (why not async_write directly): I have a multi-threaded app, and asyncSendMsg is usually called from a different thread than the MsgClient resides in. Thus, by adding the IMsgPtr into the array by post-ing, I can avoid using inter-thread synchronization using mutexes - as far as post's handler will always be executed in the same thread where the MsgClient is located.

I'd appreciate knowing how I can move construct std::unique_ptr<IMsg> in the post's handler. Also, if you have any ideas on improving the code quality here, this will be appreciated as well.


Solution

  • Move-only handler support has improved over latest releases.

    That means when using the post, dispatch and defer interfaces on executors. However, you're using the legacy io_service::post interface.

    If your version of boost is recent enough you may be able to simply get away with switching m_io.post(task) into asio::post(m_io, task)

    First Take

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/endian/arithmetic.hpp>
    #include <deque>
    #include <iostream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    
    // Defined abstract class IMsg and concrete MsgTest class, representing the
    // messages to be sent
    struct IMsg {
        virtual ~IMsg() = default;
        using buffers = std::vector<boost::asio::const_buffer>;
        virtual buffers as_buffers() const = 0;
    };
    
    struct MsgTest : public IMsg {
        boost::endian::big_uint32_t msg_type_ = 0x0001;
        std::string                 payload_;
        boost::endian::big_uint32_t payload_length_ = payload_.length();
    
        MsgTest(std::string payload) : payload_(std::move(payload)) {}
    
        virtual buffers as_buffers() const override {
            return {
                boost::asio::buffer(&msg_type_, sizeof(msg_type_)),
                boost::asio::buffer(&payload_length_, sizeof(payload_length_)),
                boost::asio::buffer(payload_),
            };
        }
    };
    
    using IMsgPtr = std::unique_ptr<IMsg>;
    
    class MsgClient {
      public:
        MsgClient(boost::asio::io_context& ioc) : m_io(ioc) {}
    
        error_code connect(tcp::endpoint ep) {
            auto task = std::packaged_task<error_code()>{[this, ep]() {
                error_code ec;
                m_socket.connect(ep, ec);
                return ec;
            }};
            return asio::post(m_io, std::move(task)).get();
        }
    
        void asyncSendMsg(IMsgPtr msgPtr) {
            post(m_io, [this, msgPtr = std::move(msgPtr)]() mutable {
                m_messagesOut.emplace_back(std::move(msgPtr));
    
                if (m_messagesOut.size() == 1)
                    do_send_loop();
            });
        }
    
      private:
        void do_send_loop() {
            if (m_messagesOut.empty())
                return;
    
            asio::async_write( //
                m_socket, m_messagesOut.front()->as_buffers(),
                [this](error_code ec, size_t /*length*/) {
                    if (!ec.failed()) {
                        m_messagesOut.pop_front();
    
                        do_send_loop();
                    } else {
                        // todo: error
                    }
                });
        }
    
        asio::io_context&   m_io;
        tcp::socket         m_socket{m_io};
        std::deque<IMsgPtr> m_messagesOut;
    };
    
    // Calling the asynSendMsg
    int main() {
        boost::asio::io_context ioc;
        MsgClient client(ioc);
    
        std::cout << "Connecting: " << client.connect({{}, 7878}).message() << "\n";
    
        for (auto msg : {"Hello", " World!\n", "Bye", " Universe\n"})
            client.asyncSendMsg(std::make_unique<MsgTest>(msg));
    
        ioc.run();
    }
    

    When run against netcat -tlp 7878 | xxd:

    Connecting: Success
    

    With the netcat pipeline printing:

    00000000: 0000 0001 0000 0005 4865 6c6c 6f00 0000  ........Hello...
    00000010: 0100 0000 0820 576f 726c 6421 0a00 0000  ..... World!....
    00000020: 0100 0000 0342 7965 0000 0001 0000 000a  .....Bye........
    00000030: 2055 6e69 7665 7273 650a                  Universe.
    

    Note the many small fixes/improvements:

    • using buffer sequence instead of serialized string - this may or may not help you depending on how you serialize your data

    • showing some message framing for fun

    • showing how to safely dispatch synchronous operations to the service (by using a packaged task/future)

    • correctly queuing the output messages

      • not allowing multiple async_write operations concurrently (see docs)
      • correctly sending FIFO order (you had Serialize(...back()) but then pop_front()...)

    Going From Here

    I'd actually go the extra step and switch to the executor model, which has some simplifying benefits:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/endian/arithmetic.hpp>
    #include <deque>
    #include <iostream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    
    // Defined abstract class IMsg and concrete MsgTest class, representing the
    // messages to be sent
    struct IMsg {
        virtual ~IMsg() = default;
        using buffers = std::vector<boost::asio::const_buffer>;
        virtual buffers as_buffers() const = 0;
    };
    
    struct MsgTest : IMsg {
        boost::endian::big_uint32_t msg_type_ = 0x0001;
        std::string                 payload_;
        boost::endian::big_uint32_t payload_length_ = payload_.length();
    
        MsgTest(std::string payload) : payload_(std::move(payload)) {}
    
        virtual buffers as_buffers() const override {
            return {
                boost::asio::buffer(&msg_type_, sizeof(msg_type_)),
                boost::asio::buffer(&payload_length_, sizeof(payload_length_)),
                boost::asio::buffer(payload_),
            };
        }
    };
    
    using IMsgPtr = std::unique_ptr<IMsg>;
    
    class MsgClient {
      public:
        template <typename Executor> MsgClient(Executor ex) : m_socket(ex) {}
    
        error_code connect(tcp::endpoint ep) {
            auto task = std::packaged_task<error_code()>{[this, ep]() {
                error_code ec;
                m_socket.connect(ep, ec);
                return ec;
            }};
            return asio::post(m_socket.get_executor(), std::move(task)).get();
        }
    
        void asyncSendMsg(IMsgPtr msgPtr) {
            post(m_socket.get_executor(),
                 [this, msgPtr = std::move(msgPtr)]() mutable {
                     m_messagesOut.emplace_back(std::move(msgPtr));
    
                     if (m_messagesOut.size() == 1)
                         do_send_loop();
                 });
        }
    
      private:
        void do_send_loop() {
            if (m_messagesOut.empty())
                return;
    
            asio::async_write( //
                m_socket, m_messagesOut.front()->as_buffers(),
                [this](error_code ec, size_t /*length*/) {
                    if (!ec.failed()) {
                        m_messagesOut.pop_front();
    
                        do_send_loop();
                    } else {
                        // todo: error
                    }
                });
        }
    
        tcp::socket         m_socket;
        std::deque<IMsgPtr> m_messagesOut;
    };
    
    // Calling the asynSendMsg
    int main() {
        boost::asio::thread_pool ioc(1);
    
        MsgClient client(make_strand(ioc.get_executor()));
    
        std::cout << "Connecting: " << client.connect({{}, 7878}).message() << "\n";
    
        for (auto msg : {"Hello", " World!\n", "Bye", " Universe\n"})
            client.asyncSendMsg(std::make_unique<MsgTest>(msg));
    
        ioc.join();
    }
    

    BONUS

    If you have to use the old interfaces and cannot upgrade Boost, you might use the BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS define to skip the handler type checks, which makes it compile in more cases: https://godbolt.org/z/76dh9dcjj