Search code examples
c++network-programmingboostipcboost-asio

How to implement an IPC protocol using Boost ASIO?


I'm trying to implement a simple IPC protocol for a project that will be built using Boost ASIO. The idea is to have the communication be done through IP/TCP, with a server with the backend and a client that will be using the data received from the server to build the frontend. The whole session would go like this:

  1. The connection is established
  2. The client sends a 2 byte packet with some information that will be used by the server to build its response (this is stored as the struct propertiesPacket)
  3. The server processes the data received and stores the output in a struct of variable size called processedData
  4. The server sends a 2 byte unsigned integer that will indicate the client what size the struct it will receive has (let's say the struct is of size n bytes)
  5. The server sends the struct data as a n byte packet
  6. The connection is ended

I tried implementing this by myself, following the great tutorial available in Boost ASIO's documentation, as well as the examples included in the library and some repos I found on Github, but as this is my first hand working with networking and IPC, I couldn't make it work, my client returns an exception saying the connection was reset by the peer.

What I have right now is this:

// File client.cpp
int main(int argc, char *argv[])
{
    try {
        propertiesPacket properties;
        // ...
        // We set the data inside the properties struct
        // ...

        boost::asio::io_context io;
        boost::asio::ip::tcp::socket socket(io);
        boost::asio::ip::tcp::resolver resolver(io);

        boost::asio::connect(socket, resolver.resolve(argv[1], argv[2]));
        boost::asio::write(socket, boost::asio::buffer(&properties, sizeof(propertiesPacket)));

        unsigned short responseSize {};
        boost::asio::read(socket, boost::asio::buffer(&responseSize, sizeof(short)));

        processedData* response = reinterpret_cast<processedData*>(malloc(responseSize));
        boost::asio::read(socket, boost::asio::buffer(response, responseSize));

        // ...
        // The client handles the data
        // ...

        return 0;
    } catch (std::exception &e) {
        std::cerr << e.what() << std::endl;
    }
}
// File server.cpp
class ServerConnection
    : public std::enable_shared_from_this<ServerConnection>
{
    public:
        using TCPSocket = boost::asio::ip::tcp::socket;

        ServerConnection::ServerConnection(TCPSocket socket)
          : socket_(std::move(socket)),
            properties_(nullptr),
            filePacket_(nullptr),
            filePacketSize_(0)
        {
        }


        void start() { doRead(); }

    private:
        void doRead()
        {
            auto self(shared_from_this());
            socket_.async_read_some(boost::asio::buffer(properties_, sizeof(propertiesPacket)),
                                    [this, self](boost::system::error_code ec, std::size_t /*length*/)
                                    {
                                        if (!ec) {
                                            processData();
                                            doWrite(&filePacketSize_, sizeof(short));

                                            doWrite(filePacket_, sizeof(*filePacket_));
                                        }
                                    });

        }

        void doWrite(void* data, size_t length)
        {
            auto self(shared_from_this());
            boost::asio::async_write(socket_, boost::asio::buffer(data, length),
                                     [this, self](boost::system::error_code ec, std::size_t /*length*/)
                                     {
                                         if (!ec) { doRead(); }
                                     });
        }

        void processData()
        { /* Data is processed */ }

        TCPSocket socket_;

        propertiesPacket* properties_;
        processedData* filePacket_;
        short filePacketSize_;
};

class Server
{
    public:
        using IOContext = boost::asio::io_context;
        using TCPSocket = boost::asio::ip::tcp::socket;
        using TCPAcceptor = boost::asio::ip::tcp::acceptor;

        Server::Server(IOContext& io, short port)
            : socket_(io),
              acceptor_(io, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
        {
            doAccept();
        }


    private:
        void doAccept()
        {
            acceptor_.async_accept(socket_,
                [this](boost::system::error_code ec)
                {
                    if (!ec) {
                        std::make_shared<ServerConnection>(std::move(socket_))->start();
                    }

                    doAccept();
                });
        }

        TCPSocket socket_;
        TCPAcceptor acceptor_;
};

What did I do wrong? My guess is that inside the doRead function, calling multiple times the doWrite function, when that function then also calls doRead is in part what's causing problems, but I don't know what the correct way of writing data asynchronously multiple times is. But I'm also sure that isn't the only part of my code that isn't behaving as I think it should.


Solution

  • Besides the problems with the code shown that I mentioned in the comments, there is indeed the problem that you suspected:

    My guess is that inside the doRead function, calling multiple times the doWrite function, when that function then also calls doRead is in part what's causing problems

    The fact that "doRead" is in the same function isn't necessarily a problem (that's just full-duplex socket IO). However "calling multiple times" is. See the docs:

    This operation is implemented in terms of zero or more calls to the stream's async_write_some function, and is known as a composed operation. The program must ensure that the stream performs no other write operations (such as async_write, the stream's async_write_some function, or any other composed operations that perform writes) until this operation completes.

    The usual way is to put the whole message in a single buffer, but if that would be "expensive" to copy, you can use a BufferSequence, which is known as scatter/gather buffers.

    Specifically, you would replace

    doWrite(&filePacketSize_, sizeof(short));
    doWrite(filePacket_, sizeof(*filePacket_));
    

    with something like

    std::vector<boost::asio::const_buffer> msg{
        boost::asio::buffer(&filePacketSize_, sizeof(short)),
        boost::asio::buffer(filePacket_, sizeof(*filePacket_)),
    };
    
    doWrite(msg);
    

    Note that this assumes that filePacketSize and filePacket have been assigned proper values!

    You could of course modify do_write to accept the buffer sequence:

    template <typename Buffers> void doWrite(Buffers msg)
    {
        auto self(shared_from_this());
        boost::asio::async_write(
            socket_, msg,
            [this, self](boost::system::error_code ec, std::size_t /*length*/) {
                if (!ec) {
                    doRead();
                }
            });
    }
    

    But in your case I'd simplify by inlining the body (now that you don't call it more than once anyway).

    SIDE NOTES

    Don't use new or delete. NEVER use malloc in C++. Never use reinterpret_cast<> (except in the very rarest of exceptions that the standard allows!). Instead of

        processedData* response = reinterpret_cast<processedData*>(malloc(responseSize));
    

    Just use

        processedData response;
    

    (optionally add {} for value-initialization of aggregates). If you need variable-length messages, consider to put a vector or a array<char, MAXLEN> inside the message. Of course, array is fixed length but it preserves POD-ness, so it might be easier to work with. If you use vector, you'd want a scatter/gather read into a buffer sequence like I showed above for the write side.

    Instead of reinterpreting between inconsistent short and unsigned short types, perhaps just spell the type with the standard sizes: std::uint16_t everywhere. Keep in mind that you are not taking into account byte order so your protocol will NOT be portable across compilers/architectures.

    Provisional Fixes

    This is the listing I ended up with after reviewing the code you shared.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    
    namespace ba = boost::asio;
    using boost::asio::ip::tcp;
    using boost::system::error_code;
    using TCPSocket = tcp::socket;
    
    struct processedData { };
    struct propertiesPacket { };
    
    // File server.cpp
    class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
      public:
        ServerConnection(TCPSocket socket) : socket_(std::move(socket))
        { }
    
        void start() {
            std::clog << __PRETTY_FUNCTION__ << std::endl;
            doRead();
        }
    
      private:
        void doRead()
        {
            std::clog << __PRETTY_FUNCTION__ << std::endl;
            auto self(shared_from_this());
            socket_.async_read_some(
                ba::buffer(&properties_, sizeof(properties_)),
                [this, self](error_code ec, std::size_t length) {
                    std::clog << "received: " << length << std::endl;
                    if (!ec) {
                        processData();
    
                        std::vector<ba::const_buffer> msg{
                            ba::buffer(&filePacketSize_, sizeof(uint16_t)),
                            ba::buffer(&filePacket_, filePacketSize_),
                        };
    
                        ba::async_write(socket_, msg,
                            [this, self = shared_from_this()](
                                error_code ec, std::size_t length) {
                                std::clog << " written: " << length
                                          << std::endl;
                                if (!ec) {
                                    doRead();
                                }
                            });
                    }
                });
        }
    
        void processData() {
            std::clog << __PRETTY_FUNCTION__ << std::endl;
            /* Data is processed */
        }
        TCPSocket socket_;
    
        propertiesPacket properties_{};
        processedData    filePacket_{};
        uint16_t         filePacketSize_ = sizeof(filePacket_);
    };
    
    class Server
    {
      public:
        using IOContext   = ba::io_context;
        using TCPAcceptor = tcp::acceptor;
    
        Server(IOContext& io, uint16_t port)
            : socket_(io)
            , acceptor_(io, {tcp::v4(), port})
        {
            doAccept();
        }
    
      private:
        void doAccept()
        {
            std::clog << __PRETTY_FUNCTION__ << std::endl;
            acceptor_.async_accept(socket_, [this](error_code ec) {
                if (!ec) {
                    std::clog << "Accepted " << socket_.remote_endpoint()
                              << std::endl;
                    std::make_shared<ServerConnection>(std::move(socket_))->start();
                    doAccept();
                } else {
                    std::clog << "Accept " << ec.message() << std::endl;
                }
            });
        }
    
        TCPSocket   socket_;
        TCPAcceptor acceptor_;
    };
    
    // File client.cpp
    int main(int argc, char *argv[])
    {
        ba::io_context io;
        Server         s{io, 6869};
    
        std::thread server_thread{[&io] {
            io.run();
        }};
    
        // always check argc!
        std::vector<std::string> args(argv, argv + argc);
    
        if (args.size() == 1)
            args = {"demo", "127.0.0.1", "6869"};
    
        // avoid race with server accept thread
        post(io, [&io, args] {
            try {
                propertiesPacket properties;
                // ...
                // We set the data inside the properties struct
                // ...
    
                tcp::socket   socket(io);
                tcp::resolver resolver(io);
    
                connect(socket, resolver.resolve(args.at(1), args.at(2)));
                write(socket, ba::buffer(&properties, sizeof(properties)));
    
                uint16_t responseSize{};
                ba::read(socket, ba::buffer(&responseSize, sizeof(uint16_t)));
    
                std::clog << "Client responseSize: " << responseSize << std::endl;
                processedData response{};
                assert(responseSize <= sizeof(response));
                ba::read(socket, ba::buffer(&response, responseSize));
    
                // ...
                // The client handles the data
                // ...
                
                // for online demo:
                io.stop();
            } catch (std::exception const& e) {
                std::clog << e.what() << std::endl;
            }
        });
    
        io.run_one();
        server_thread.join();
    }
    

    Printing something similar to

    void Server::doAccept()
    Server::doAccept()::<lambda(boost::system::error_code)> Success
    void ServerConnection::start()
    void ServerConnection::doRead()
    void Server::doAccept()
    received: 1
    void ServerConnection::processData()
     written: 3
    void ServerConnection::doRead()
    Client responseSize: 1