Search code examples
boostbufferboost-asioc++20nlohmann-json

boost::asio::async_read_until with custom match_char to accept only JSON format


I've been trying to change match_char function to accept only JSON messages when reading data from a socket.

I have 2 implementations (one does not work and the other one works but I don't think it's efficient).

1- First approach (working)

    typedef boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> buffer_iterator;

    static std::pair<buffer_iterator, bool> match_json2(const buffer_iterator begin,
                                                            const buffer_iterator end) {
        buffer_iterator i = begin;
        while (i != end) {
            if ((*i == ']') || (*i == '}')) {
                return std::make_pair(i, true);
            }
            *i++;
        }
        return std::make_pair(i, false);
    }

With this definition, I read in a loop and reconstruct the json. This is a working version, but if I receive a message different from a valid json, I stay in the loop, can't clear tmp_response and never recover from it...

        std::string read_buffer_string() {
            std::string response;
            bool keepReading = true;
            while (keepReading) {
                std::string tmp_response;
                async_read_until(s, ba::dynamic_buffer(tmp_response), match_json2, yc);
                if (!tmp_response.empty()) {
                    response += tmp_response;
                    if (nlohmann::json::accept(response)) {
                        keepReading = false;
                    }
                }
            }
            return response;
        }
  1. Second approach (not working). Ideally I would like something like this one (this implementation doesn't work because begin iterator doesn't always point to the start of the message - I guess some data is already been transferred to the buffer-, and therefore match_json returns invalid values.

     static std::pair<buffer_iterator, bool> match_json(const buffer_iterator begin,
                                                             const buffer_iterator end) {
         buffer_iterator i = begin;
         while (i != end) {
             if ((*i == ']') || (*i == '}')) {
                 std::string _message(begin, i);
                 std::cout << _message << std::endl;
                 if (nlohmann::json::accept(_message)) {
                     return std::make_pair(i, true);
                 }
             }
             *i++;
         }
         return std::make_pair(i, false);
     }
    

And then call it like this:

        std::string read_buffer_string() {
            std::string response;
            async_read_until(s, ba::dynamic_buffer(response), match_json, yc);
            return response;
        }

Does anybody now a more efficient way to do it? Thanks in advance! :)


Solution

  • Of course, right after posting my other answer I remembered that Boost has accepted Boost JSON in 1.75.0.

    It does stream parsing way more gracefully: https://www.boost.org/doc/libs/1_75_0/libs/json/doc/html/json/ref/boost__json__stream_parser.html#json.ref.boost__json__stream_parser.usage

    It actually deals with trailing data as well!

    stream_parser p;                  // construct a parser
    std::size_t n;                    // number of characters used
    n = p.write_some( "[1,2" );       // parse some of a JSON
    assert( n == 4 );                 // all characters consumed
    n = p.write_some( ",3,4] null" ); // parse the remainder of the JSON
    assert( n == 6 );                 // only some characters consumed
    assert( p.done() );               // we have a complete JSON
    value jv = p.release();           // take ownership of the value
    

    I would also submit that this could be a better match for a CompletionCondition: see https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/reference/read/overload3.html

    Here's an implementation that I tested with:

    template <typename Buffer, typename SyncReadStream>
    static size_t read_json(SyncReadStream& s, Buffer buf,
        boost::json::value& message, boost::json::parse_options options = {})
    {
        boost::json::stream_parser p{{}, options};
    
        size_t total_parsed = 0;
        boost::asio::read(s, buf, [&](boost::system::error_code ec, size_t /*n*/) {
            size_t parsed = 0;
    
            for (auto& contiguous : buf.data()) {
                parsed += p.write_some(
                    boost::asio::buffer_cast<char const*>(contiguous),
                    contiguous.size(), ec);
            }
            buf.consume(parsed);
            total_parsed += parsed;
            return ec || p.done(); // true means done
        });
    
        message = p.release(); // throws if incomplete
        return total_parsed;
    }
    

    Adding a delegating overload for streambufs:

    template <typename SyncReadStream, typename Alloc>
    static size_t read_json(SyncReadStream& s,
        boost::asio::basic_streambuf<Alloc>& buf,
        boost::json::value& message,
        boost::json::parse_options options = {})
    {
        return read_json(s, boost::asio::basic_streambuf_ref<Alloc>(buf), message, options);
    }
    

    Demo Program

    This demo program adds the test-cases from earlier as well as a socket client with some benchmark statistics added. Arguments:

    • test to run the tests instead of the socket client
    • streambuf to use the streambuf overload instead of std::string dynamic buffer
    • comments to allow comments in the JSON
    • trailing_commas to allow trailing commas in the JSON
    • invalid_utf8 to allow invalid utf8 in the JSON

    Live On Compiler Explorer¹

    #include <boost/spirit/home/x3.hpp>
    #include <boost/fusion/adapted.hpp>
    #include <iomanip>
    #include <iostream>
    namespace x3 = boost::spirit::x3;
    
    int main() {
        std::string const s = 
            "? 8==2 : true ! false"
            "? 9==3 : 'book' ! 'library'";
    
        using expression = std::string;
        using ternary = std::tuple<expression, expression, expression>;
        std::vector<ternary> parsed;
    
        auto expr_ = x3::lexeme [+~x3::char_("?:!")];
        auto ternary_ = "?" >> expr_ >> ":" >> expr_ >> "!" >> expr_;
    
        std::cout << "=== parser approach:\n";
        if (x3::phrase_parse(begin(s), end(s), *x3::seek[ ternary_ ], x3::space, parsed)) {
    
            for (auto [cond, e1, e2] : parsed) {
                std::cout
                    << " condition " << std::quoted(cond) << "\n"
                    << " true expression " << std::quoted(e1) << "\n"
                    << " else expression " << std::quoted(e2) << "\n"
                    << "\n";
            }
        } else {
            std::cout << "non matching" << '\n';
        }
    }
    

    With test prints:

     ----- valid test cases
    Testing {}                     -> Success {}
    Testing {"a":4, "b":5}         -> Success {"a":4,"b":5}
    Testing []                     -> Success []
    Testing [4, "b"]               -> Success [4,"b"]
     ----- incomplete test cases
    Testing {                      -> (incomplete...)
    Testing {"a":4, "b"            -> (incomplete...)
    Testing [                      -> (incomplete...)
    Testing [4, "                  -> (incomplete...)
     ----- invalid test cases
    Testing }                      -> syntax error
    Testing "a":4 }                -> Success "a" -- remaining `:4 }`
    Testing ]                      -> syntax error
     ----- excess input test cases
    Testing {}{"a":4, "b":5}       -> Success {} -- remaining `{"a":4, "b":5}`
    Testing []["a", "b"]           -> Success [] -- remaining `["a", "b"]`
    Testing {} bogus trailing data -> Success {} -- remaining `bogus trailing data`
    

    With socket client some demos:

    Mean packet size: 16 in 2 packets
    Request: 28 bytes
    Request: {"a":4,"b":"5"} bytes
    Remaining data: "bye
    "
    took 0.000124839s, ~0.213899MiB/s
    

    With a large (448MiB) location_history.json:

    Mean packet size: 511.999 in 917791 packets
    Request: 469908167 bytes
     (large request output suppressed)
    took 3.30509s, ~135.59MiB/s
    

    enter image description here


    ¹ linking non-header only liobraries is not supported on Compiler Explorer