Search code examples
c++serializationboostasioboost-beast

Sending http request from multiple threads using boost::asio. How to handle responses serially


For a client side application, I wish to serialize http post requests using strand, in order to avoid overlapping writes/ responses to/from the server.

Serialization is made using boost::asio, by calling the method as a callback from strand.post as shown below. Notice that inside each http session (write request and read it's response, is made in a separated callback. (Notice that within the post, the http is made synchronously.

boost::asio::io_context::strand strand_;

void Send(
   boost::beast::http::request<boost::beast::http::string_body> &req) {
  strand_.post([=]() {
      boost::beast::http::write(stream_, req);
      ...
      boost::beast::http::response<boost::beast::http::dynamic_body> res;
      boost::beast::flat_buffer buffer;
      boost::beast::error_code ec;
      boost::beast::http::read(stream_.socket(), buffer, res, ec);
  }

As can be seen from my example, the response reading is also made within the post callback. My question is whether this is actually required if I don't care about the ordering of reading the responses. can I assume that after each http request sent to the server boost::beast::http::write(stream_, req);, the data is already waiting in the rx buffer queue, so another thread may read the responses one-by-one ?

thanks !


Solution

  • You are not actually reading asynchronously, therefore you're not actually synchronizing much with the strand. The only thing that is being synchronized is the access to the stream_/socket.

    Now, doing everything synchronously is good thinking. In that case I'd suggest you don't need any threads, and therefore, no strand to begin with.

    Once you do have the strand/thread(s) doing non-trivial operations risks blocking the service thread(s). Consider when the webserver takes a second to respond. That's taking ages in computer terms.

    If you are doing as many requests simultaneously as there are threads (which could typically be low, e.g. 4) then nothing else can progress on the io service, thus negating the very purpose of ASIO: asynchronous I/O.


    Let me quickly pave over some minor issues in your question code, making it self-contained: Live On Coliru

    #include <boost/beast/http.hpp>
    #include <boost/beast.hpp>
    #include <boost/asio.hpp>
    #include <iostream>
    using boost::asio::ip::tcp;
    namespace beast = boost::beast;
    namespace http  = beast::http;
    
    using Context = boost::asio::io_context;
    using Strand  = boost::asio::strand<Context::executor_type>;
    
    struct Demo {
        using Request = http::request<http::string_body>;
    
        Demo(Context& ctx, tcp::endpoint ep) //
            : strand_(ctx.get_executor())
        {
            stream_.connect(ep);
        }
    
        void Send(Request const& req)
        {
            post(strand_, [=,this]() {
                // prepare request ...
                http::write(stream_, req);
                //...
                http::response<boost::beast::http::dynamic_body> res;
                beast::flat_buffer buffer;
                beast::error_code  ec;
                http::read(stream_, buffer, res, ec);
    
                std::cout << res << "\n";
            });
        }
    
          private:
            Strand strand_;
            tcp::socket stream_{strand_};
    };
    
    int main() {
        Context io;
        Demo    x(io, {{}, 80});
    
        Demo::Request req{http::verb::get, "/", 10};
        req.prepare_payload();
        x.Send(req);
    
        io.run();
    }
    

    Improving

    I'd suggest an asynchronous interface that is safe to use. I.e. you cannot be sure a new request won't be started on the same socket before the previous one(s) have been completed, so you need a queue:

    void Send(Request req) {
        post(strand_, [this, req = std::move(req)]() mutable {
            _outgoing.push_back(std::move(req));
    
            if (_outgoing.size() == 1) // no pending
                ServiceRequestQueue();
        });
    }
    

    Now, all the logic you had is moved into the request loop, but async:

    void ServiceRequestQueue()
    {
        http::async_write( //
            stream_, _outgoing.front(), [this](beast::error_code ec, size_t) {
                if (ec) {
                    std::cerr << "Request cannot be sent: " << ec.message() << std::endl;
                    return;
                }
    
                // receive response
                _incoming.clear();
                _incoming.body().clear();
    
                http::async_read( //
                    stream_, buffer, _incoming,
                    [this](beast::error_code ec, size_t) {
                        if (ec) {
                            std::cerr << "Response cannot be received: "
                                      << ec.message() << std::endl;
                            return;
                        }
                        // std::cout << _incoming.base() << "\n";
                        std::cout << stream_.remote_endpoint() << " "
                                  << _incoming.result() << " "
                                  << _incoming.body().size() << "\n";
    
                        // request done
                        _outgoing.pop_front();
                        // continue if more queued
                        if (not _outgoing.empty())
                            ServiceRequestQueue();
                    });
            });
    }
    

    You might want to split some of the completion handlers into separate functions, or do something useful with the request.

    Live On Coliru

    int main() {
        Context io;
    
        Demo example_com { io, "93.184.216.34", 80 } ;
        Demo coliru      { io, "173.203.57.63", 80 } ;
        Demo localhost   { io, "127.0.0.1", 80     } ;
    
        // queue many requests before service start
        auto queue10 = [](Demo& client, std::string hostname, int version) {
            Demo::Request req{http::verb::get, "/", 11};
            req.set(http::field::host,hostname);
            req.prepare_payload();
    
            for (int i = 0; i < 10; ++i)
                client.Send(req);
        };
    
        queue10(example_com, "www.example.com", 11);
        queue10(coliru, "coliru-stacked-crooked.com", 11);
        queue10(localhost, "sehe.nl", 10);
    
        // start service
        io.run();
    }
    

    Prints, on my system:

    127.0.0.1:80 OK 2798
    127.0.0.1:80 OK 2798
    127.0.0.1:80 OK 2798
    127.0.0.1:80 OK 2798
    127.0.0.1:80 OK 2798
    127.0.0.1:80 OK 2798
    127.0.0.1:80 OK 2798
    127.0.0.1:80 OK 2798
    93.184.216.34:80 OK 1256
    127.0.0.1:80 OK 2798
    127.0.0.1:80 OK 2798
    93.184.216.34:80 OK 1256
    173.203.57.63:80 OK 8616
    93.184.216.34:80 OK 1256
    93.184.216.34:80 OK 1256
    93.184.216.34:80 OK 1256
    173.203.57.63:80 OK 8616
    93.184.216.34:80 OK 1256
    93.184.216.34:80 OK 1256
    173.203.57.63:80 OK 8616
    93.184.216.34:80 OK 1256
    93.184.216.34:80 OK 1256
    93.184.216.34:80 OK 1256
    173.203.57.63:80 OK 8616
    173.203.57.63:80 OK 8616
    173.203.57.63:80 OK 8616
    173.203.57.63:80 OK 8616
    173.203.57.63:80 OK 8616
    173.203.57.63:80 OK 8616
    173.203.57.63:80 OK 8616
    

    Note that if you create mamy requests simultaneously (e.g. even before running the io_context at all), you can observe that separate HTTP clients work in overlapping fashion.

    Advanced

    If you really wanted a function that initiates a request and allows you to consume the response in the completion handler, consider extending your interface like this:

    template <typename Token>
    void async_send(Request req, Token&& token) {
        using result_type = typename boost::asio::async_result<
            std::decay_t<Token>, void(beast::error_code, Response)>;
        using handler_type = typename result_type::completion_handler_type;
        handler_type handler(std::forward<Token>(token));
        result_type  result(handler);
    
        struct Op {
            Request req;
            Response res;
            handler_type handler;
    
            Op(Request&& r, handler_type&& h)
                : req(std::move(r))
                , handler(std::move(h))
            {
            }
    
            bool check(beast::error_code ec, bool force_completion = false) {
                if (ec || force_completion)
                    std::move(handler)(ec, std::move(res));
                return !ec.failed();
            }
        };
    
        auto op = std::make_shared<Op>(std::move(req), std::move(handler));
    
        post(strand_, [this, op] {
            http::async_write( //
                stream_, op->req,
                [this, op](beast::error_code ec, size_t) mutable {
                    if (op->check(ec))
                        http::async_read(stream_, buffer, op->res,
                                         [op](beast::error_code ec, size_t) {
                                             op->check(ec, true);
                                         });
                });
        });
    
        return result.get();
    }
    

    Note this moves the responsibility to avoid overlapping requests per client back to the caller. So starting some request chains like

    // queue several request chains before service start
    AsyncRequestChain(10, example_com, "www.example.com");
    AsyncRequestChain(10, coliru, "coliru.stacked-crooked.com");
    AsyncRequestChain(10, localhost, "sehe.nl");
    
    // start service
    io.run();
    

    With the chain itself being:

    void AsyncRequestChain(unsigned n, Demo& client, std::string hostname)
    {
        if (!n)
            return;
        Demo::Request req{http::verb::get, "/", 11};
        req.set(http::field::host, hostname);
        req.prepare_payload();
    
        client.async_send( //
            req, [=, &client](beast::error_code ec, Demo::Response&& res) {
                std::cout << hostname << ": " << ec.message();
                if (!ec)
                    std::cout << " " << res.result() //
                              << " " << res.body().size();
                std::cout << std::endl;
    
                // continue with next iteration
                AsyncRequestChain(n - 1, client, hostname);
            });
    }
    

    Prints, on my machine:

    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    sehe.nl: Success OK 2798
    www.example.com: Success OK 1256
    www.example.com: Success OK 1256
    coliru.stacked-crooked.com: Success OK 8616
    www.example.com: Success OK 1256
    www.example.com: Success OK 1256
    www.example.com: Success OK 1256
    coliru.stacked-crooked.com: Success OK 8616
    www.example.com: Success OK 1256
    www.example.com: Success OK 1256
    coliru.stacked-crooked.com: Success OK 8616
    www.example.com: Success OK 1256
    www.example.com: Success OK 1256
    www.example.com: Success OK 1256
    coliru.stacked-crooked.com: Success OK 8616
    coliru.stacked-crooked.com: Success OK 8616
    coliru.stacked-crooked.com: Success OK 8616
    coliru.stacked-crooked.com: Success OK 8616
    coliru.stacked-crooked.com: Success OK 8616
    coliru.stacked-crooked.com: Success OK 8616
    coliru.stacked-crooked.com: Success OK 8616
    

    See it Live On Coliru