Search code examples
c++boostboost-asioboost-iostreams

boost socket iostreams echo server with zlib compression sleeps until the connection will be closed


I try to create simple echo server with zlib compression following this and this examples.

My idea is to send some string now because I can convert POD types to string (std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))) before sending when I will be sure the transport layer works.

And there is a problem here. Client compresses data, sends it and says data were sended but the server is blocked on data reading. I cannot understand why it happens.

I tried to use operator<< with out.flush(), also I tried to use boost::iostreams::copy(). The result is the same. The code example is (I use the same source file for server and client depending on args):

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

void receive()
{
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::stringstream buffer;

    std::cout << "start session" << std::endl;

    try
    {
        for (;;)
        {
            {
                boost::iostreams::filtering_istream in;
                in.push(boost::iostreams::zlib_decompressor());
                in.push(stream);

                std::cout << "start reading" << std::endl;

                // looks like server is blocked here
                boost::iostreams::copy(in, buffer);
            }

            std::cout << "data: " << buffer.str() << std::endl;

            {
                boost::iostreams::filtering_ostream out;
                out.push(boost::iostreams::zlib_compressor());
                out.push(stream);

                boost::iostreams::copy(buffer, out);
            }

            std::cout << "Reply is sended" << std::endl;
        }
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << e.error() << '\n';
        stream.close();
    }
}

void send(const std::string &data)
{
    tcp::endpoint ep(ip::address::from_string(host), port);
    
    tcp::iostream stream;
    stream.connect(ep);

    std::stringstream buffer;
    buffer << data;

    if (!stream)
    {
        std::cerr << "Cannot connect to " << host << ":" << port << std::endl;
        return;
    }

    try
    {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(stream);

            out << buffer.str();
            out.flush();
        }

        std::cout << "sended: " << data << std::endl;
        buffer.str("");

        {
            boost::iostreams::filtering_istream in;
            in.push(boost::iostreams::zlib_decompressor());
            in.push(stream);

            // looks like client is blocked here
            boost::iostreams::copy(in, buffer);
        }

        std::cout << "result: " << buffer.str() << std::endl;
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << '\n';
    }
}

int main(int argc, const char *argv[])
{
    if (argc > 1 && argv[1] ==  std::string("sender"))
        send("hello world");
    else
        receive();

    return 0;
}

First I start server and then I start client. The following output is produced:

Server

$ ./example
# now it waits while client will be accepted
start session
start reading

Client

$ ./example sender
sended: hello world

The programs are blocked with the output above. I guess the server still waits for data from client and it does not know the client sent all what it had.

If I close client with Ctrl + C then the output is following:

$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5

and

$ ./example sender
sended: hello world
^C

I guess the zlib error-5 is because the server thinks the archive is incomplete.

The expected behavior is no blocking. The message must appear in the server program output when the client was started.

Why is the program blocked on reading? How can I fix it?


Solution

  • iostreams::copy does just that: it copies stream.

    Compliments to your code. It's very readable :) It reminds me of this answer Reading and writing files with boost iostream socket. The main difference is that that answer sends a single compressed blob and closes.

    You're "right" that the decompressor knows when one compressed block is complete, but it doesn't decide that another will not follow.

    So you need to add framing. The traditional way is to pass a length out-of-band. I've implemented the changes while also reducing code duplication by using IO manipulators.

    template <typename T> struct LengthPrefixed {
        T _wrapped;
    
        friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
        friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;
    };
    

    And

    template <typename T> struct ZLIB {
        T& data;
        ZLIB(T& ref) : data(ref){}
    
        friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
        friend std::istream& operator>>(std::istream& is, ZLIB z) ;
    };
    

    ZLIB manipulator

    This one mainly encapsulates the code that you duplicated between the sender/receiver:

    template <typename T> struct ZLIB {
        T& data;
        ZLIB(T& ref) : data(ref){}
    
        friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
            {
                boost::iostreams::filtering_ostream out;
                out.push(boost::iostreams::zlib_compressor());
                out.push(os);
                out << z.data << std::flush;
            }
            return os.flush();
        }
    
        friend std::istream& operator>>(std::istream& is, ZLIB z) {
            boost::iostreams::filtering_istream in;
            in.push(boost::iostreams::zlib_decompressor());
            in.push(is);
    
            std::ostringstream oss;
            copy(in, oss);
            z.data = oss.str();
    
            return is;
        }
    };
    

    I made T templated so it can store std::string& or std::string const& depending on the need.

    LengthPrefixed manipulator

    This manipulator doesn't care what is being serialized, but will simply prefix it with the effective length on-the-wire:

    template <typename T> struct LengthPrefixed {
        T _wrapped;
    
        friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
            std::ostringstream oss;
            oss << lp._wrapped;
            auto on_the_wire = std::move(oss).str();
    
            debug << "Writing length " << on_the_wire.length() << std::endl;
            return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
        }
    
        friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
            size_t len;
            if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
                debug << "Reading length " << len << std::endl;
    
                std::string on_the_wire(len, '\0');
                if (is.read(on_the_wire.data(), on_the_wire.size())) {
                    std::istringstream iss(on_the_wire);
                    iss >> lp._wrapped;
                }
            }
            return is;
        }
    };
    

    We add a subtlety: by storing a reference or value depending on what we are constructed with we can also accept temporaries (like the ZLIB manipulator):

    template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
    template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;
    

    I didnt' think to make the ZLIB manipulator equally generic. So I leave that as an exorcism for the reader

    DEMO PROGRAM

    Combining these two, you can write the sender/receiver simply as:

    void server() {
        boost::asio::io_context ctx;
        tcp::endpoint ep(ip::address::from_string(host), port);
        tcp::acceptor a(ctx, ep);
    
        tcp::iostream stream;
        a.accept(stream.socket());
    
        std::cout << "start session" << std::endl;
    
        for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
            std::cout << "data: " << std::quoted(data) << std::endl;
            stream << LengthPrefixed{ZLIB{data}} << std::flush;
        }
    }
    
    void client(std::string data) {
        tcp::endpoint ep(ip::address::from_string(host), port);
        tcp::iostream stream(ep);
    
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
        std::cout << "sent: " << std::quoted(data) << std::endl;
    
        stream >> LengthPrefixed{ZLIB{data}};
        std::cout << "result: " << std::quoted(data) << std::endl;
    }
    

    Indeed, it prints:

    reader: start session
    sender: Writing length 19
    reader: Reading length 19
    sender: sent: "hello world"
    reader: data: "hello world"
    reader: Writing length 19
    sender: Reading length 19
    sender: result: "hello world"
    

    Complete Listing

    #include <boost/iostreams/filtering_stream.hpp>
    #include <boost/iostreams/filter/zlib.hpp>
    #include <boost/iostreams/copy.hpp>
    #include <boost/asio.hpp>
    
    #include <iostream>
    #include <iomanip>
    #include <sstream>
    
    namespace ip = boost::asio::ip;
    using ip::tcp;
    
    const unsigned short port = 9999;
    const char host[] = "127.0.0.1";
    
    #ifdef DEBUG
        std::ostream debug(std::cerr.rdbuf());
    #else
        std::ostream debug(nullptr);
    #endif
    
    template <typename T> struct LengthPrefixed {
        T _wrapped;
    
        friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
            std::ostringstream oss;
            oss << lp._wrapped;
            auto on_the_wire = std::move(oss).str();
    
            debug << "Writing length " << on_the_wire.length() << std::endl;
            return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
        }
    
        friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
            size_t len;
            if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
                debug << "Reading length " << len << std::endl;
    
                std::string on_the_wire(len, '\0');
                if (is.read(on_the_wire.data(), on_the_wire.size())) {
                    std::istringstream iss(on_the_wire);
                    iss >> lp._wrapped;
                }
            }
            return is;
        }
    };
    
    template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
    template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;
    
    template <typename T> struct ZLIB {
        T& data;
        ZLIB(T& ref) : data(ref){}
    
        friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
            {
                boost::iostreams::filtering_ostream out;
                out.push(boost::iostreams::zlib_compressor());
                out.push(os);
                out << z.data << std::flush;
            }
            return os.flush();
        }
    
        friend std::istream& operator>>(std::istream& is, ZLIB z) {
            boost::iostreams::filtering_istream in;
            in.push(boost::iostreams::zlib_decompressor());
            in.push(is);
    
            std::ostringstream oss;
            copy(in, oss);
            z.data = oss.str();
    
            return is;
        }
    };
    
    void server() {
        boost::asio::io_context ctx;
        tcp::endpoint ep(ip::address::from_string(host), port);
        tcp::acceptor a(ctx, ep);
    
        tcp::iostream stream;
        a.accept(stream.socket());
    
        std::cout << "start session" << std::endl;
    
        for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
            std::cout << "data: " << std::quoted(data) << std::endl;
            stream << LengthPrefixed{ZLIB{data}} << std::flush;
        }
    }
    
    void client(std::string data) {
        tcp::endpoint ep(ip::address::from_string(host), port);
        tcp::iostream stream(ep);
    
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
        std::cout << "sent: " << std::quoted(data) << std::endl;
    
        stream >> LengthPrefixed{ZLIB{data}};
        std::cout << "result: " << std::quoted(data) << std::endl;
    }
    
    int main(int argc, const char**) {
        try {
            if (argc > 1)
                client("hello world");
            else
                server();
        } catch (const std::exception& e) {
            std::cerr << e.what() << '\n';
        }
    }