I am writing a tcp client-server app using boost::asio in the likeness of redis db with a limited number of connections to the server at one time. This is my first experience with boost::asio, do not judge strictly.
client.h
#ifndef CLIENT_IMPL_H
#define CLIENT_IMPL_H
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <cstddef>
#include <string>
namespace io = boost::asio;
using tcp = io::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::placeholders;
class client : public boost::enable_shared_from_this<client> {
public:
client(io::io_context &io_context, std::string const &addr, uint16_t port)
: socket_(io_context) {
start(tcp::endpoint(
io::ip::address::from_string(addr),
port
));
}
void start(tcp::endpoint ep);
private:
void on_connect(error_code const &err);
void on_read(error_code const &err, std::size_t bytes_transmitted);
void async_read();
void on_write(error_code const &err);
void async_write();
private:
tcp::socket socket_;
io::streambuf streambuf_;
std::string request_;
std::string response_;
};
#endif
cleint.cpp
#include <iostream>
#include "client_impl.h"
void client::start(tcp::endpoint ep) {
socket_.async_connect(
ep,
[this](error_code err) {
on_connect(err);
}
);
}
void client::on_connect(error_code const &err) {
if (!err) {
async_write();
} else {
socket_.close();
}
}
void client::on_read(error_code const &err, std::size_t bytes_transmitted) {
if (!err) {
std::stringstream message(response_);
message << std::istream(&streambuf_).rdbuf();
streambuf_.consume(bytes_transmitted);
std::cout << response_ << std::endl;
response_.clear();
async_write();
} else {
socket_.close();
}
}
void client::async_read() {
auto self = shared_from_this();
io::async_read_until(
socket_,
streambuf_,
"\n",
[self](error_code err, std::size_t bytes_transmitted) {
self->on_read(err, bytes_transmitted);
}
);
}
void client::on_write(error_code const &err) {
if (!err) {
async_read();
} else {
socket_.close();
}
}
void client::async_write() {
std::getline(std::cin, request_);
request_ += "\n";
io::async_write(
socket_,
io::buffer(request_),
[this](error_code err, std::size_t bytes_transmitted) {
on_write(err);
}
);
}
server.h
#ifndef SERVER_IMPL_H
#define SERVER_IMPL_H
#include <boost/asio.hpp>
#include <boost/asio/io_context.hpp>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <utility>
#include <regex>
namespace io = boost::asio;
using tcp = io::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::placeholders;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket, std::unordered_map<std::string, std::string> &storage) :
socket_(std::move(socket)),
storage_ {storage} {}
void start();
private:
void async_read();
void on_read(error_code err, std::size_t bytes_transmitted);
void async_write();
void on_write(error_code err, std::size_t bytes_transmitted);
void message_handler(std::string const &message);
void on_put(std::string const &message);
void on_get(std::string const &message);
void on_del(std::string const &message);
void on_count(std::string const &message);
void on_dump(std::string const &message);
bool is_valid_key(std::string const &key);
bool is_valid_txt(std::string const &fname);
std::string get_from_storage(std::string const &key);
void put_from_storage(std::string const &key, std::string const &value);
tcp::socket socket_;
io::streambuf streambuf_;
std::string buffer_;
std::string response_;
std::unordered_map<std::string, std::string> &storage_;
const static std::regex key_regex_;
const static std::regex txt_regex_;
};
class server {
public:
server(io::io_context& io_context, std::uint16_t port) :
io_context_(io_context),
acceptor_(io_context_, tcp::endpoint(tcp::v4(), port)) {
async_accept();
}
void async_accept();
private:
io::io_context& io_context_;
tcp::acceptor acceptor_;
std::unordered_map<std::string, std::string> storage_;
};
#endif
server.cpp
#include "server_impl.h"
#include <boost/asio.hpp>
#include <cstddef>
#include <functional>
#include <memory>
#include <regex>
#include <sstream>
#include <string>
#include <utility>
/* session implementation */
const std::regex session::key_regex_("[A-Za-z0-9]+");
const std::regex session::txt_regex_("[A-Za-z0-9]+\\.txt");
void session::start() {
async_read();
}
void session::async_read() {
io::async_read_until(
socket_,
streambuf_,
"\n",
std::bind(&session::on_read, shared_from_this(), _1, _2)
);
}
void session::on_read(error_code err, std::size_t bytes_transmitted) {
if (!err) {
std::stringstream message(buffer_);
message << std::istream(&streambuf_).rdbuf();
streambuf_.consume(bytes_transmitted);
message_handler(buffer_);
async_write();
buffer_.clear();
async_read();
} else {
socket_.close();
}
}
void session::async_write() {
io::async_write(
socket_,
io::buffer(response_),
std::bind(&session::on_write, shared_from_this(), _1, _2)
);
}
void session::on_write(error_code err, std::size_t bytes_transmitted) {
if (!err) {
async_write();
} else {
socket_.close();
}
}
void session::message_handler(std::string const &message) {
if (message.starts_with("PUT ")) {
on_put(message);
} else if (message.starts_with("GET ")) {
on_get(message);
} else if (message.starts_with("DEL ")) {
on_del(message);
} else if (message.starts_with("COUNT ")) {
on_count(message);
} else if (message.starts_with("DUMP ")) {
on_dump(message);
} else {
response_ = "NE";
}
}
void session::on_put(std::string const &message) {
std::istringstream ss(message);
std::string tmp;
std::string key;
int cnt {};
ss >> tmp;
while (ss >> tmp) {
if (++cnt == 3) {
response_ = "NE\n";
return;
}
if (cnt == 1) {
if (!is_valid_key(tmp)) {
response_ = "NE\n";
return;
}
key = tmp;
}
}
if (cnt != 2) {
response_ = "NE\n";
return;
}
if (std::string value = get_from_storage(key); value != "") {
response_ = "OK " + value + "\n";
}
storage_[key] = tmp;
}
void session::on_get(std::string const &message) {
std::istringstream ss(message);
std::string tmp;
std::string value;
int cnt {};
ss >> tmp;
while (ss >> tmp) {
if (++cnt == 2) {
response_ = "NE\n";
return;
}
if (!is_valid_key(tmp)) {
response_ = "NE\n";
return;
}
value = get_from_storage(tmp);
}
if (cnt != 1 || value == "") {
response_ = "NE\n";
return;
}
response_ = "OK " + value + "\n";
}
void session::on_del(std::string const &message) {
std::istringstream ss(message);
std::string tmp;
std::string value;
int cnt {};
ss >> tmp;
while (ss >> tmp) {
if (++cnt == 2) {
response_ = "NE\n";
return;
}
if (!is_valid_key(tmp)) {
response_ = "NE\n";
return;
}
value = get_from_storage(tmp);
}
if (cnt != 1 || value == "") {
response_ = "NE\n";
return;
}
storage_.erase(tmp);
response_ = "OK " + value + "\n";
}
void session::on_count(std::string const &message) {
std::istringstream ss(message);
std::string tmp;
ss >> tmp;
while (ss >> tmp) {
response_ = "NE\n";
return;
}
size_t cnt {storage_.size()};
response_ = "OK " + std::to_string(cnt) + "\n";
}
void session::on_dump(std::string const &message) {
std::istringstream ss(message);
std::string tmp;
int cnt {};
ss >> tmp;
while (ss >> tmp) {
if (++cnt == 2) {
response_ = "NE\n";
return;
}
}
if (cnt != 1 || !is_valid_txt(tmp)) {
response_ = "NE\n";
return;
}
// dumping
response_ = "OK\n";
}
bool session::is_valid_key(std::string const &key) {
return std::regex_match(key, key_regex_);
}
bool session::is_valid_txt(std::string const &fname) {
return std::regex_match(fname, txt_regex_);
}
std::string session::get_from_storage(std::string const &key) {
if (auto it = storage_.find(key); it != storage_.end()) {
return it->second;
}
return "";
}
void session::put_from_storage(std::string const &key, std::string const &value) {
storage_[key] = value;
}
/* server implemetation */
void server::async_accept() {
acceptor_.async_accept([this](error_code err, tcp::socket socket) {
if (!err) {
std::make_shared<session>(std::move(socket), storage_)->start();
}
});
}
And I run this like that
// for server
int main(int argc, char* argv[]) {
if (argc < 3 || argc > 4) {
std::cout << "usage: server <port> <limit_connections> [<dump_file>]" << std::endl;
return 1;
}
io::io_context io_context;
server srv(io_context, static_cast<uint16_t>(std::stoi(argv[1])));
srv.async_accept();
io_context.run();
return 0;
}
// for client
int main(int argc, char* argv[]) {
if (argc != 3) {
std::cout << "usage: client <ip> <port>" << std::endl;
return 1;
}
io::io_context io_context;
std::make_shared<client>(io_context, "127.0.0.1", std::stoi(argv[2]));
io_context.run();
return 0;
}
The server starts and listens, here is the netstat result:
tcp 0 0 0.0.0.0:8080 0.0.0.0:* LISTEN 4707/./Server
But when I try to start the server, I get this error immediately after startup:
terminate called after throwing an instance of 'boost::wrapexcept<boost::system::system_error>'
what(): close: Bad file descriptor [system:9]
A debugger tells you that the exception is thrown from close()
:
This means that the connection failed (for some other reason) and you then die because you try to close a socket that may not be open.
You cancel it! async_connect
does not capture shared_from_this()
. The client
destructs, cancelling any operations. Your close()
is called after object destuction: that's Undefined Behavior.
Likely you already had shared_from_this
, but you removed it. Why? If you didn't remove it, you would get a bad_weak_ptr
exception. That's because shared_from_this
cannot be used from the constructor: Why shared_from_this can't be used in constructor from technical standpoint?
Changing:
start()
to be separate from constructionboost::enable_shared_from_this
to std::enable_shared_from_this
(because you were using std::make_shared
!!)self
capturesNow you only have the issue that std::getline
blocks all IO.
Your server has issues handling the buffer:
std::stringstream message(buffer_);
message << std::istream(&streambuf_).rdbuf();
streambuf_.consume(bytes_transmitted);
message_handler(buffer_);
Here,buffer_
does not contain the request. Perhaps you are missing buffer_ = message.str()
? May I suggest improved naming so it is less confusing to read this code? I'd probably consider something like:
std::string message;
if (getline(std::istream(&streambuf_), message))
message_handler(message);
The server also causes a write loop by calling more writes from on_write
. Instead I'd move the new read to on_write
to avoid handling overlapping requests (because you donot support it, because e.g. response_
is shared).
With some other minor fixes, and just "living" with stuff like the required space after "COUNT ":
File client.cpp
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <cstddef>
#include <string>
namespace io = boost::asio;
using tcp = io::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::placeholders;
class client : public std::enable_shared_from_this<client> {
public:
client(io::io_context& io_context) : socket_(io_context) {}
void start(tcp::endpoint ep);
private:
void on_connect(error_code const& err);
void on_read(error_code const& err, size_t bytes_transmitted);
void do_read();
void on_write(error_code const& err);
void do_write();
private:
tcp::socket socket_;
io::streambuf streambuf_;
std::string request_;
std::string response_;
};
#include <iostream>
void client::start(tcp::endpoint ep) {
socket_.async_connect(ep, [this, self = shared_from_this()](error_code err) { on_connect(err); });
}
void client::on_connect(error_code const& err) {
std::cerr << "on_connect: " << err.message() << std::endl;
if (!err) {
do_write();
} else {
socket_.close();
}
}
void client::on_read(error_code const& err, size_t /*bytes_transmitted*/) {
std::cerr << "on_read: " << err.message() << std::endl;
if (!err) {
std::cout << &streambuf_ << std::endl;
response_.clear();
do_write();
} else {
socket_.close();
}
}
void client::do_read() {
auto self = shared_from_this();
io::async_read_until(socket_, streambuf_, "\n", [self](error_code err, size_t bytes_transmitted) {
self->on_read(err, bytes_transmitted);
});
}
void client::on_write(error_code const& err) {
std::cerr << "on_write: " << err.message() << std::endl;
if (!err) {
do_read();
} else {
socket_.close();
}
}
void client::do_write() {
std::getline(std::cin, request_);
std::cout << "Sending " << quoted(request_) << std::endl;
request_ += "\n";
io::async_write(
socket_, io::buffer(request_),
[this, self = shared_from_this()](error_code err, size_t /*bytes_transmitted*/) { on_write(err); });
}
int main(int argc, char* argv[]) {
if (argc != 3) {
std::cout << "usage: client <ip> <port>" << std::endl;
return 1;
}
io::io_context io_context;
auto c = std::make_shared<client>(io_context);
c->start(tcp::endpoint({}, std::stoi(argv[2])));
io_context.run();
}
File server.cpp
#include <boost/asio.hpp>
#include <boost/asio/io_context.hpp>
#include <iostream>
#include <memory>
#include <regex>
#include <string>
#include <unordered_map>
#include <utility>
namespace io = boost::asio;
using tcp = io::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::placeholders;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket, std::unordered_map<std::string, std::string>& storage)
: socket_(std::move(socket))
, storage_{storage} {}
void start();
private:
void do_read();
void on_read(error_code err, size_t bytes_transmitted);
void do_write();
void on_write(error_code err, size_t bytes_transmitted);
void message_handler(std::string const& message);
void on_put(std::string const& message);
void on_get(std::string const& message);
void on_del(std::string const& message);
void on_count(std::string const& message);
void on_dump(std::string const& message);
bool is_valid_key(std::string const& key);
bool is_valid_txt(std::string const& fname);
std::string get_from_storage(std::string const& key);
void put_from_storage(std::string const& key, std::string const& value);
tcp::socket socket_;
io::streambuf streambuf_;
std::string buffer_;
std::string response_;
std::unordered_map<std::string, std::string>& storage_;
static std::regex const key_regex_;
static std::regex const txt_regex_;
};
class server {
public:
server(io::io_context& io_context, std::uint16_t port)
: io_context_(io_context)
, acceptor_(io_context_, tcp::endpoint(tcp::v4(), port)) {
async_accept();
}
void async_accept();
private:
io::io_context& io_context_;
tcp::acceptor acceptor_;
std::unordered_map<std::string, std::string> storage_;
};
#include <boost/asio.hpp>
#include <cstddef>
#include <functional>
#include <memory>
#include <regex>
#include <sstream>
#include <string>
#include <utility>
/* session implementation */
std::regex const session::key_regex_("[A-Za-z0-9]+");
std::regex const session::txt_regex_("[A-Za-z0-9]+\\.txt");
void session::start() { do_read(); }
void session::do_read() {
io::async_read_until(socket_, streambuf_, "\n", std::bind(&session::on_read, shared_from_this(), _1, _2));
}
void session::on_read(error_code err, size_t /*bytes_transmitted*/) {
std::cerr << "on_write: " << err.message() << std::endl;
if (!err) {
std::string message;
if (getline(std::istream(&streambuf_), message)) {
message_handler(message);
do_write();
buffer_.clear();
}
} else {
socket_.close();
}
}
void session::do_write() {
std::cerr << "Writing response: " << quoted(response_) << std::endl;
io::async_write(socket_, io::buffer(response_),
std::bind(&session::on_write, shared_from_this(), _1, _2));
}
void session::on_write(error_code err, size_t /*bytes_transmitted*/) {
std::cerr << "on_write: " << err.message() << std::endl;
if (!err) {
response_.clear();
do_read();
} else {
socket_.close();
}
}
void session::message_handler(std::string const& message) {
std::cerr << "message_handler: " << quoted(message) << std::endl;
if (message.starts_with("PUT ")) {
on_put(message);
} else if (message.starts_with("GET ")) {
on_get(message);
} else if (message.starts_with("DEL ")) {
on_del(message);
} else if (message.starts_with("COUNT ")) {
on_count(message);
} else if (message.starts_with("DUMP ")) {
on_dump(message);
} else {
response_ = "NE";
}
}
void session::on_put(std::string const& message) {
std::istringstream ss(message);
std::string tmp;
std::string key;
int cnt{};
ss >> tmp;
while (ss >> tmp) {
if (++cnt == 3) {
response_ = "NE\n";
return;
}
if (cnt == 1) {
if (!is_valid_key(tmp)) {
response_ = "NE\n";
return;
}
key = tmp;
}
}
if (cnt != 2) {
response_ = "NE\n";
return;
}
if (std::string value = get_from_storage(key); value != "") {
response_ = "OK " + value + "\n";
} else {
response_ = "OK\n";
}
storage_[key] = tmp;
}
void session::on_get(std::string const& message) {
std::istringstream ss(message);
std::string tmp;
std::string value;
int cnt{};
ss >> tmp;
while (ss >> tmp) {
if (++cnt == 2) {
response_ = "NE\n";
return;
}
if (!is_valid_key(tmp)) {
response_ = "NE\n";
return;
}
value = get_from_storage(tmp);
}
if (cnt != 1 || value == "") {
response_ = "NE\n";
return;
}
response_ = "OK " + value + "\n";
}
void session::on_del(std::string const& message) {
std::istringstream ss(message);
std::string tmp;
std::string value;
int cnt{};
ss >> tmp;
while (ss >> tmp) {
if (++cnt == 2) {
response_ = "NE\n";
return;
}
if (!is_valid_key(tmp)) {
response_ = "NE\n";
return;
}
value = get_from_storage(tmp);
}
if (cnt != 1 || value == "") {
response_ = "NE\n";
return;
}
storage_.erase(tmp);
response_ = "OK " + value + "\n";
}
void session::on_count(std::string const& message) {
std::istringstream ss(message);
std::string tmp;
ss >> tmp;
while (ss >> tmp) {
response_ = "NE\n";
return;
}
size_t cnt{storage_.size()};
response_ = "OK " + std::to_string(cnt) + "\n";
}
void session::on_dump(std::string const& message) {
std::istringstream ss(message);
std::string tmp;
int cnt{};
ss >> tmp;
while (ss >> tmp) {
if (++cnt == 2) {
response_ = "NE\n";
return;
}
}
if (cnt != 1 || !is_valid_txt(tmp)) {
response_ = "NE\n";
return;
}
// dumping
response_ = "OK\n";
}
bool session::is_valid_key(std::string const& key) { return std::regex_match(key, key_regex_); }
bool session::is_valid_txt(std::string const& fname) { return std::regex_match(fname, txt_regex_); }
std::string session::get_from_storage(std::string const& key) {
if (auto it = storage_.find(key); it != storage_.end()) {
return it->second;
}
return "";
}
void session::put_from_storage(std::string const& key, std::string const& value) { storage_[key] = value; }
/* server implemetation */
void server::async_accept() {
acceptor_.async_accept([this](error_code err, tcp::socket socket) {
if (!err) {
std::make_shared<session>(std::move(socket), storage_)->start();
}
});
}
int main(int argc, char* argv[]) {
if (argc < 3 || argc > 4) {
std::cout << "usage: server <port> <limit_connections> [<dump_file>]" << std::endl;
return 1;
}
io::io_context io_context;
server srv(io_context, static_cast<uint16_t>(std::stoi(argv[1])));
srv.async_accept();
io_context.run();
}
And proof of the pudding: