I try to create simple echo server with zlib compression following this and this examples.
My idea is to send some string now because I can convert POD types to string (std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))
) before sending when I will be sure the transport layer works.
And there is a problem here. Client compresses data, sends it and says data were sended but the server is blocked on data reading. I cannot understand why it happens.
I tried to use operator<<
with out.flush()
, also I tried to use boost::iostreams::copy()
. The result is the same. The code example is (I use the same source file for server and client depending on args):
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <sstream>
namespace ip = boost::asio::ip;
using ip::tcp;
const unsigned short port = 9999;
const char host[] = "127.0.0.1";
void receive()
{
boost::asio::io_context ctx;
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::acceptor a(ctx, ep);
tcp::iostream stream;
a.accept(stream.socket());
std::stringstream buffer;
std::cout << "start session" << std::endl;
try
{
for (;;)
{
{
boost::iostreams::filtering_istream in;
in.push(boost::iostreams::zlib_decompressor());
in.push(stream);
std::cout << "start reading" << std::endl;
// looks like server is blocked here
boost::iostreams::copy(in, buffer);
}
std::cout << "data: " << buffer.str() << std::endl;
{
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::zlib_compressor());
out.push(stream);
boost::iostreams::copy(buffer, out);
}
std::cout << "Reply is sended" << std::endl;
}
}
catch(const boost::iostreams::zlib_error &e)
{
std::cerr << e.what() << e.error() << '\n';
stream.close();
}
}
void send(const std::string &data)
{
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::iostream stream;
stream.connect(ep);
std::stringstream buffer;
buffer << data;
if (!stream)
{
std::cerr << "Cannot connect to " << host << ":" << port << std::endl;
return;
}
try
{
{
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::zlib_compressor());
out.push(stream);
out << buffer.str();
out.flush();
}
std::cout << "sended: " << data << std::endl;
buffer.str("");
{
boost::iostreams::filtering_istream in;
in.push(boost::iostreams::zlib_decompressor());
in.push(stream);
// looks like client is blocked here
boost::iostreams::copy(in, buffer);
}
std::cout << "result: " << buffer.str() << std::endl;
}
catch(const boost::iostreams::zlib_error &e)
{
std::cerr << e.what() << '\n';
}
}
int main(int argc, const char *argv[])
{
if (argc > 1 && argv[1] == std::string("sender"))
send("hello world");
else
receive();
return 0;
}
First I start server and then I start client. The following output is produced:
Server
$ ./example
# now it waits while client will be accepted
start session
start reading
Client
$ ./example sender
sended: hello world
The programs are blocked with the output above. I guess the server still waits for data from client and it does not know the client sent all what it had.
If I close client with Ctrl + C
then the output is following:
$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5
and
$ ./example sender
sended: hello world
^C
I guess the zlib error-5
is because the server thinks the archive is incomplete.
The expected behavior is no blocking. The message must appear in the server program output when the client was started.
Why is the program blocked on reading? How can I fix it?
iostreams::copy
does just that: it copies stream.
Compliments to your code. It's very readable :) It reminds me of this answer Reading and writing files with boost iostream socket. The main difference is that that answer sends a single compressed blob and closes.
You're "right" that the decompressor knows when one compressed block is complete, but it doesn't decide that another will not follow.
So you need to add framing. The traditional way is to pass a length out-of-band. I've implemented the changes while also reducing code duplication by using IO manipulators.
template <typename T> struct LengthPrefixed {
T _wrapped;
friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;
};
And
template <typename T> struct ZLIB {
T& data;
ZLIB(T& ref) : data(ref){}
friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
friend std::istream& operator>>(std::istream& is, ZLIB z) ;
};
ZLIB
manipulatorThis one mainly encapsulates the code that you duplicated between the sender/receiver:
template <typename T> struct ZLIB {
T& data;
ZLIB(T& ref) : data(ref){}
friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
{
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::zlib_compressor());
out.push(os);
out << z.data << std::flush;
}
return os.flush();
}
friend std::istream& operator>>(std::istream& is, ZLIB z) {
boost::iostreams::filtering_istream in;
in.push(boost::iostreams::zlib_decompressor());
in.push(is);
std::ostringstream oss;
copy(in, oss);
z.data = oss.str();
return is;
}
};
I made
T
templated so it can storestd::string&
orstd::string const&
depending on the need.
LengthPrefixed
manipulatorThis manipulator doesn't care what is being serialized, but will simply prefix it with the effective length on-the-wire:
template <typename T> struct LengthPrefixed {
T _wrapped;
friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
std::ostringstream oss;
oss << lp._wrapped;
auto on_the_wire = std::move(oss).str();
debug << "Writing length " << on_the_wire.length() << std::endl;
return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
}
friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
size_t len;
if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
debug << "Reading length " << len << std::endl;
std::string on_the_wire(len, '\0');
if (is.read(on_the_wire.data(), on_the_wire.size())) {
std::istringstream iss(on_the_wire);
iss >> lp._wrapped;
}
}
return is;
}
};
We add a subtlety: by storing a reference or value depending on what we are constructed with we can also accept temporaries (like the ZLIB manipulator):
template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;
I didnt' think to make the
ZLIB
manipulator equally generic. So I leave that as an exorcism for the reader
Combining these two, you can write the sender/receiver simply as:
void server() {
boost::asio::io_context ctx;
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::acceptor a(ctx, ep);
tcp::iostream stream;
a.accept(stream.socket());
std::cout << "start session" << std::endl;
for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
std::cout << "data: " << std::quoted(data) << std::endl;
stream << LengthPrefixed{ZLIB{data}} << std::flush;
}
}
void client(std::string data) {
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::iostream stream(ep);
stream << LengthPrefixed{ZLIB{data}} << std::flush;
std::cout << "sent: " << std::quoted(data) << std::endl;
stream >> LengthPrefixed{ZLIB{data}};
std::cout << "result: " << std::quoted(data) << std::endl;
}
Indeed, it prints:
reader: start session
sender: Writing length 19
reader: Reading length 19
sender: sent: "hello world"
reader: data: "hello world"
reader: Writing length 19
sender: Reading length 19
sender: result: "hello world"
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
#include <sstream>
namespace ip = boost::asio::ip;
using ip::tcp;
const unsigned short port = 9999;
const char host[] = "127.0.0.1";
#ifdef DEBUG
std::ostream debug(std::cerr.rdbuf());
#else
std::ostream debug(nullptr);
#endif
template <typename T> struct LengthPrefixed {
T _wrapped;
friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
std::ostringstream oss;
oss << lp._wrapped;
auto on_the_wire = std::move(oss).str();
debug << "Writing length " << on_the_wire.length() << std::endl;
return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
}
friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
size_t len;
if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
debug << "Reading length " << len << std::endl;
std::string on_the_wire(len, '\0');
if (is.read(on_the_wire.data(), on_the_wire.size())) {
std::istringstream iss(on_the_wire);
iss >> lp._wrapped;
}
}
return is;
}
};
template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;
template <typename T> struct ZLIB {
T& data;
ZLIB(T& ref) : data(ref){}
friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
{
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::zlib_compressor());
out.push(os);
out << z.data << std::flush;
}
return os.flush();
}
friend std::istream& operator>>(std::istream& is, ZLIB z) {
boost::iostreams::filtering_istream in;
in.push(boost::iostreams::zlib_decompressor());
in.push(is);
std::ostringstream oss;
copy(in, oss);
z.data = oss.str();
return is;
}
};
void server() {
boost::asio::io_context ctx;
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::acceptor a(ctx, ep);
tcp::iostream stream;
a.accept(stream.socket());
std::cout << "start session" << std::endl;
for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
std::cout << "data: " << std::quoted(data) << std::endl;
stream << LengthPrefixed{ZLIB{data}} << std::flush;
}
}
void client(std::string data) {
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::iostream stream(ep);
stream << LengthPrefixed{ZLIB{data}} << std::flush;
std::cout << "sent: " << std::quoted(data) << std::endl;
stream >> LengthPrefixed{ZLIB{data}};
std::cout << "result: " << std::quoted(data) << std::endl;
}
int main(int argc, const char**) {
try {
if (argc > 1)
client("hello world");
else
server();
} catch (const std::exception& e) {
std::cerr << e.what() << '\n';
}
}