Search code examples
c++socketsasynchronousboostboost-asio

Boost::Asio : Why does async_write truncate the buffer when sending it through the given socket?


I'm currently attempting to design a fairly simple boost::asio server. My first unit test is fairly simple: send a JSON request {"COMMAND": "ADD_1", "VALUE" : 1} and receive the following response:

{
    "SUCCESS" : true,
    "VALUE" : 2
}

However, instead, the reply is truncated by one character after being read from the socket:

Reply is: {
    "SUCCESS" : true,
    "VALUE" : 2

Process finished with exit code 0

The code to write to the socket is fairly simple, a member function of a class RequestContext:

    void RequestContext::DoWrite(std::size_t length)
    {
        JSONCPP_STRING parse_err;
        Json::Value json_req, json_resp;
        auto self(this->shared_from_this());
        std::string client_req_str(data_);

        if (reader_->parse(client_req_str.c_str(),
                           client_req_str.c_str() +
                           client_req_str.length(),
                           &json_req, &parse_err))
        {
            try {
                // Get JSON response.
                json_resp = ProcessRequest(json_req);
                json_resp["SUCCESS"] = true;
            } catch (const std::exception &ex) {
                // If json parsing failed.
                json_resp["SUCCESS"] = false;
                json_resp["ERRORS"] = std::string(ex.what());
            }
        } else {
            // If json parsing failed.
            json_resp["SUCCESS"] = false;
            json_resp["ERRORS"] = std::string(parse_err);
        }

        std::string resp = Json::writeString(writer_, json_resp);

        boost::asio::async_write(socket_,
                                 boost::asio::buffer(&resp[0], resp.size()),
                                 [this, self]
                                 (boost::system::error_code ec,
                                  std::size_t bytes_xfered) {
                                    if (!ec)     DoRead();
                                 });
    }

I have verified that ProcessRequest returns the correct value, so the issue is evidently with async_write. I have tried increasing the value of the second argument to async_write, but it seems to have no effect. What am I doing wrong?

A minimum reproducible example can be found below:

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <json/json.h>

using boost::asio::ip::tcp;
using boost::system::error_code;
/// NOTE: This class exists exclusively for unit testing.
class RequestClass {
public:
    /**
     * Initialize class with value n to add sub from input values.
     *
     * @param n Value to add/sub from input values.
     */
    explicit RequestClass(int n) : n_(n) {}

    /// Value to add/sub from
    int n_;

    /**
     * Add n to value in JSON request.
     *
     * @param request JSON request with field "value".
     * @return JSON response containing modified field "value" = [original_value] + n.
     */
    [[nodiscard]] Json::Value add_n(const Json::Value &request) const
    {
        Json::Value resp;
        resp["SUCCESS"] = true;

        // If value is present in request, return value + 1, else return error.
        if (request.get("VALUE", NULL) != NULL) {
            resp["VALUE"] = request["VALUE"].asInt() + this->n_;
        } else {
            resp["SUCCESS"] = false;
            resp["ERRORS"] = "Invalid value.";
        }
        return resp;
    }

    /**
     * Sun n from value in JSON request.
     *
     * @param request JSON request with field "value".
     * @return JSON response containing modified field "value" = [original_value] - n.
     */
    [[nodiscard]] Json::Value sub_n(const Json::Value &request) const
    {
        Json::Value resp, value;
        resp["SUCCESS"] = true;

        // If value is present in request, return value + 1, else return error.
        if (request.get("VALUE", NULL) != NULL) {
            resp["VALUE"] = request["VALUE"].asInt() - this->n_;
        } else {
            resp["SUCCESS"] = false;
            resp["ERRORS"] = "Invalid value.";
        }
        return resp;
    }
};

typedef std::function<Json::Value(RequestClass, const Json::Value &)> RequestClassMethod;

template<class RequestHandler, class RequestClass>
class RequestContext :
    public std::enable_shared_from_this<RequestContext<RequestHandler,
                                                       RequestClass>>
{
public:
    typedef std::map<std::string, RequestHandler> CommandMap;

    RequestContext(tcp::socket socket, CommandMap commands,
                   RequestClass *request_class_inst)
        : socket_(std::move(socket))
        , commands_(std::move(commands))
        , request_class_inst_(request_class_inst)
        , reader_((new Json::CharReaderBuilder)->newCharReader())
    {}

    void Run()
    {
        DoRead();
    }

    void Kill()
    {
        continue_ = false;
    }

private:
    tcp::socket socket_;
    RequestClass *request_class_inst_;
    CommandMap commands_;
    /// Reads JSON.
    const std::unique_ptr<Json::CharReader> reader_;
    /// Writes JSON.
    Json::StreamWriterBuilder writer_;
    bool continue_ = true;
    char data_[2048];

    void DoRead()
    {
        auto self(this->shared_from_this());
        socket_.async_read_some(boost::asio::buffer(data_, 2048),
                                [this, self](error_code ec, std::size_t length)
                                {
                                  if (!ec)
                                  {
                                      DoWrite(length);
                                  }
                                });
    }

    void DoWrite(std::size_t length)
    {
        JSONCPP_STRING parse_err;
        Json::Value json_req, json_resp;
        auto self(this->shared_from_this());
        std::string client_req_str(data_);

        if (reader_->parse(client_req_str.c_str(),
                           client_req_str.c_str() +
                           client_req_str.length(),
                           &json_req, &parse_err))
        {
            try {
                // Get JSON response.
                json_resp = ProcessRequest(json_req);
                json_resp["SUCCESS"] = true;
            } catch (const std::exception &ex) {
                // If json parsing failed.
                json_resp["SUCCESS"] = false;
                json_resp["ERRORS"] = std::string(ex.what());
            }
        } else {
            // If json parsing failed.
            json_resp["SUCCESS"] = false;
            json_resp["ERRORS"] = std::string(parse_err);
        }

        std::string resp = Json::writeString(writer_, json_resp);

        boost::asio::async_write(socket_,
                                 boost::asio::buffer(&resp[0], resp.size()),
                                 [this, self]
                                 (boost::system::error_code ec,
                                  std::size_t bytes_xfered) {
                                    if (!ec)     DoRead();
                                 });
    }

    Json::Value ProcessRequest(Json::Value request)
    {
        Json::Value response;
        std::string command = request["COMMAND"].asString();

        // If command is not valid, give a response with an error.
        if(commands_.find(command) == commands_.end()) {
            response["SUCCESS"] = false;
            response["ERRORS"] = "Invalid command.";
        }
            // Otherwise, run the relevant handler.
        else {
            RequestHandler handler = commands_.at(command);
            response = handler(*request_class_inst_, request);
        }

        return response;
    }

};

template<class RequestHandler, class RequestClass>
class Server {
public:
    typedef std::map<std::string, RequestHandler> CommandMap;

    Server(boost::asio::io_context &io_context, short port,
           const CommandMap &commands,
           RequestClass *request_class_inst)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
        , commands_(commands)
        , request_class_inst_(request_class_inst)
    {
        DoAccept();
    }

    ~Server()
    {
        Kill();
    }

    void Kill()
    {
        continue_ = false;
    }

private:
    tcp::acceptor acceptor_;
    bool continue_ = true;
    CommandMap commands_;
    RequestClass *request_class_inst_;

    void DoAccept()
    {
        acceptor_.async_accept(
            [this](boost::system::error_code ec, tcp::socket socket) {
                if (!ec)
                    std::make_shared<RequestContext<RequestHandler, RequestClass>>
                        (std::move(socket), commands_, request_class_inst_)->Run();
                DoAccept();
            });
    }
};

void RunServer(short port)
{
    boost::asio::io_context io_context;
    auto *request_inst = new RequestClass(1);
    std::map<std::string, RequestClassMethod> commands {
        {"ADD_1", std::mem_fn(&RequestClass::add_n)},
        {"SUB_1", std::mem_fn(&RequestClass::sub_n)}
    };
    Server<RequestClassMethod, RequestClass> s(io_context, port, commands,
                                               request_inst);

    io_context.run();
}

void RunServerInBackground(short port)
{
    std::thread t([port] { RunServer(port); });
    t.detach();
}



int main()
{
    try
    {
        RunServerInBackground(5000);
        boost::asio::io_context io_context;
        tcp::socket s(io_context);
        tcp::resolver resolver(io_context);

        boost::asio::connect(s, resolver.resolve("127.0.0.1", "5000"));
        char request[2048] = R"({"COMMAND": "ADD_1", "VALUE" : 1})";
        size_t request_length = std::strlen(request);
        boost::asio::write(s, boost::asio::buffer(request, request_length));
        char reply[2048];
        size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, request_length));
        std::cout << "Reply is: ";
        std::cout << reply << std::endl;
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

Solution

  • The outgoing buffer needs to be a class member, just like data_, so that the lifetime is guaranteed until async_write is completed.

    You can also spot issues like this with linter/runtime checks like ASAN/UBSAN or Valgrind.

    UPDATE

    Also

    size_t reply_length =
        boost::asio::read(s, boost::asio::buffer(reply, request_length));
    

    wrongly uses request_length. As a rule, avoid manually specifying buffer sizes, at any time.

    Besides, your protocol doesn't provide framing, so you cannot practically keep the same connection open for newer requests (you don't know how many bytes to expect for a response to be complete). I'll "fix" it here by closing the connection after the first request, so we have a working demo.

    There's also a race condition with the continue_ flags, but I'll leave that as an exorcism for the reader.

    Of course, consider not leaking the request class instance.

    Oh, I also switched to Boost JSON as it seemed an easier fit:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/json.hpp>
    #include <boost/json/src.hpp>
    #include <iostream>
    
    using boost::asio::ip::tcp;
    using boost::system::error_code;
    namespace json = boost::json;
    using Value    = json::object;
    
    /// NOTE: This class exists exclusively for unit testing.
    struct Sample {
        int n_;
    
        Value add_n(Value const& request) const { return impl(std::plus<>{}, request); }
        Value sub_n(Value const& request) const { return impl(std::minus<>{}, request); }
        Value mul_n(Value const& request) const { return impl(std::multiplies<>{}, request); }
        Value div_n(Value const& request) const { return impl(std::divides<>{}, request); }
    
      private:
        template <typename Op> Value impl(Op op, Value const& req) const {
            return (req.contains("VALUE"))
                ? Value{{"VALUE", op(req.at("VALUE").as_int64(), n_)},
                        {"SUCCESS", true}}
                : Value{{"ERRORS", "Invalid value."}, {"SUCCESS", false}};
        }
    };
    
    using RequestClassMethod =
        std::function<Value(Sample, Value const&)>;
    
    template <class RequestHandler, class RequestClass>
    class RequestContext
        : public std::enable_shared_from_this<
              RequestContext<RequestHandler, RequestClass>> {
      public:
        using CommandMap = std::map<std::string, RequestHandler>;
    
        RequestContext(tcp::socket socket, CommandMap commands,
                       RequestClass* request_class_inst)
            : socket_(std::move(socket))
            , commands_(std::move(commands))
            , request_class_inst_(request_class_inst)
        {}
    
        void Run()  { DoRead(); }
        void Kill() { continue_ = false; }
    
      private:
        tcp::socket   socket_;
        CommandMap    commands_;
        RequestClass* request_class_inst_;
        bool          continue_ = true;
        char          data_[2048];
        std::string   resp_;
    
        void DoRead()
        {
            socket_.async_read_some(
                boost::asio::buffer(data_),
                [this, self = this->shared_from_this()](error_code ec, std::size_t length) {
                    if (!ec) {
                        DoWrite(length);
                    }
                });
        }
    
        void DoWrite(std::size_t length)
        {
            Value json_resp;
    
            try {
                auto json_req = json::parse({data_, length}).as_object();
                json_resp = ProcessRequest(json_req);
                json_resp["SUCCESS"] = true;
            } catch (std::exception const& ex) {
                json_resp = {{"SUCCESS", false}, {"ERRORS", ex.what()}};
            }
    
            resp_ = json::serialize(json_resp);
    
            boost::asio::async_write(socket_, boost::asio::buffer(resp_),
                 [this, self = this->shared_from_this()](
                     error_code ec, size_t bytes_xfered) {
                     if (!ec)
                         DoRead();
                 });
        }
    
        Value ProcessRequest(Value request)
        {
            auto command = request.contains("COMMAND")
                ? request["COMMAND"].as_string() //
                : "";
            std::string cmdstr(command.data(), command.size());
    
            // If command is not valid, give a response with an error.
            return commands_.contains(cmdstr) && request_class_inst_
                ? commands_.at(cmdstr)(*request_class_inst_, request)
                : Value{{"SUCCESS", false}, {"ERRORS","Invalid command."}};
        }
    };
    
    template<class RequestHandler, class RequestClass>
    class Server {
      public:
        using CommandMap = std::map<std::string, RequestHandler>;
    
        Server(boost::asio::io_context& io_context, uint16_t port,
               const CommandMap& commands, RequestClass* request_class_inst)
            : acceptor_(io_context, {{}, port})
            , commands_(commands)
            , request_class_inst_(request_class_inst)
        {
            DoAccept();
        }
    
        ~Server() { Kill(); }
        void Kill() { continue_ = false; }
    
    private:
        tcp::acceptor acceptor_;
        bool          continue_ = true;
        CommandMap    commands_;
        RequestClass *request_class_inst_;
    
        void DoAccept()
        {
            acceptor_.async_accept(
                [this](error_code ec, tcp::socket socket) {
                    if (!ec)
                        std::make_shared<
                            RequestContext<RequestHandler, RequestClass>>(
                            std::move(socket), commands_, request_class_inst_)
                            ->Run();
                    DoAccept();
                });
        }
    };
    
    void RunServer(uint16_t port)
    {
        boost::asio::io_context io_context;
    
        Server<RequestClassMethod, Sample> s(
            io_context, port,
            {{"ADD_2", std::mem_fn(&Sample::add_n)},
             {"SUB_2", std::mem_fn(&Sample::sub_n)},
             {"MUL_2", std::mem_fn(&Sample::mul_n)},
             {"DIV_2", std::mem_fn(&Sample::div_n)}},
            new Sample{2});
    
        io_context.run();
    }
    
    void RunServerInBackground(uint16_t port)
    {
        std::thread t([port] { RunServer(port); });
        t.detach();
    }
    
    int main() try {
        RunServerInBackground(5000);
        ::sleep(1); // avoid startup race
    
        boost::asio::io_context io;
        tcp::socket s(io);
        s.connect({{}, 5000});
    
        std::string const request = R"({"COMMAND": "MUL_2", "VALUE" : 21})";
        std::cout << "Request: " << std::quoted(request, '\'') << std::endl;
    
        boost::asio::write(s, boost::asio::buffer(request));
        s.shutdown(tcp::socket::shutdown_send); // avoid framing problems
    
        error_code ec;
        char reply[2048];
        size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply), ec);
    
        std::cout << "Reply is: "
                  << std::quoted(std::string_view(reply, reply_length), '\'')
                  << " (" << ec.message() << ")" << std::endl;
    } catch (std::exception const& e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }
    

    Prints

    Request: '{"COMMAND": "MUL_2", "VALUE" : 21}'
    Reply is: '{"VALUE":42,"SUCCESS":true}' (End of file)