Search code examples
c++boostboost-asio

Getting error "terminate called after throwing an instance of 'boost::wrapexcept<boost::system::system_error>'" on starting tcp::client boost::asio


I am writing a tcp client-server app using boost::asio in the likeness of redis db with a limited number of connections to the server at one time. This is my first experience with boost::asio, do not judge strictly.

client.h

#ifndef CLIENT_IMPL_H
#define CLIENT_IMPL_H

#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <cstddef>
#include <string>

namespace io = boost::asio;
using tcp = io::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::placeholders;

class client : public boost::enable_shared_from_this<client> {

public:

    client(io::io_context &io_context, std::string const &addr, uint16_t port) 
    : socket_(io_context) {
        start(tcp::endpoint(
            io::ip::address::from_string(addr),
            port
        ));
    }

    void start(tcp::endpoint ep);

private:

    void on_connect(error_code const &err);

    void on_read(error_code const &err, std::size_t bytes_transmitted);
    void async_read();

    void on_write(error_code const &err);
    void async_write();

private:

    tcp::socket socket_;
    io::streambuf streambuf_;
    std::string request_;
    std::string response_;

};

#endif

cleint.cpp

#include <iostream>

#include "client_impl.h"

void client::start(tcp::endpoint ep) {
    socket_.async_connect(
        ep,
        [this](error_code err) {
            on_connect(err);
        }
    );
}

void client::on_connect(error_code const &err) {
    if (!err) {
        async_write();
    } else {
        socket_.close();
    }
}

void client::on_read(error_code const &err, std::size_t bytes_transmitted) {
    if (!err) {
        std::stringstream message(response_);
        message << std::istream(&streambuf_).rdbuf();
        streambuf_.consume(bytes_transmitted);

        std::cout << response_ << std::endl;
        response_.clear();

        async_write();
    } else {
        socket_.close();
    }
}

void client::async_read() {
    auto self = shared_from_this();
    io::async_read_until(
        socket_,
        streambuf_,
        "\n",
        [self](error_code err, std::size_t bytes_transmitted) {
            self->on_read(err, bytes_transmitted);
        }
    );
}

void client::on_write(error_code const &err) {
    if (!err) {
        async_read();
    } else {
        socket_.close();
    }
}

void client::async_write() {
    std::getline(std::cin, request_);
    request_ += "\n";
    io::async_write(
        socket_,
        io::buffer(request_),
        [this](error_code err, std::size_t bytes_transmitted) {
            on_write(err);
        }
    );
}

server.h

#ifndef SERVER_IMPL_H
#define SERVER_IMPL_H

#include <boost/asio.hpp>
#include <boost/asio/io_context.hpp>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <utility>
#include <regex>


namespace io = boost::asio;
using tcp = io::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::placeholders;

class session : public std::enable_shared_from_this<session> {

public:
    session(tcp::socket socket, std::unordered_map<std::string, std::string> &storage) : 
        socket_(std::move(socket)),
        storage_ {storage} {}

    void start();

private:

    void async_read();
    void on_read(error_code err, std::size_t bytes_transmitted);

    void async_write();
    void on_write(error_code err, std::size_t bytes_transmitted);

    void message_handler(std::string const &message);
    void on_put(std::string const &message);
    void on_get(std::string const &message);
    void on_del(std::string const &message);
    void on_count(std::string const &message);
    void on_dump(std::string const &message);
    bool is_valid_key(std::string const &key);
    bool is_valid_txt(std::string const &fname);

    std::string get_from_storage(std::string const &key);
    void put_from_storage(std::string const &key, std::string const &value);

    tcp::socket socket_;
    io::streambuf streambuf_;
    std::string buffer_;
    std::string response_;

    std::unordered_map<std::string, std::string> &storage_;
    const static std::regex key_regex_;
    const static std::regex txt_regex_;

};

class server {

public:

    server(io::io_context& io_context, std::uint16_t port) :
        io_context_(io_context),
        acceptor_(io_context_, tcp::endpoint(tcp::v4(), port)) {
        async_accept();
    }

    void async_accept();

  private:

    io::io_context& io_context_;
    tcp::acceptor acceptor_;

    std::unordered_map<std::string, std::string> storage_;

};

#endif

server.cpp

#include "server_impl.h"

#include <boost/asio.hpp>
#include <cstddef>
#include <functional>
#include <memory>
#include <regex>
#include <sstream>
#include <string>
#include <utility>

/* session implementation */

const std::regex session::key_regex_("[A-Za-z0-9]+");
const std::regex session::txt_regex_("[A-Za-z0-9]+\\.txt");

void session::start() {
    async_read();
}

void session::async_read() {
    io::async_read_until(
        socket_,
        streambuf_,
        "\n",
        std::bind(&session::on_read, shared_from_this(), _1, _2)
    );
}

void session::on_read(error_code err, std::size_t bytes_transmitted) {
    if (!err) {
        std::stringstream message(buffer_);
        message << std::istream(&streambuf_).rdbuf();
        streambuf_.consume(bytes_transmitted);

        message_handler(buffer_);
        async_write();
        buffer_.clear();

        async_read();
    } else {
        socket_.close();
    }
}

void session::async_write() {
    io::async_write(
        socket_,
        io::buffer(response_),
        std::bind(&session::on_write, shared_from_this(), _1, _2)
    );
}

void session::on_write(error_code err, std::size_t bytes_transmitted) {
    if (!err) {
        async_write();
    } else {
        socket_.close();
    }
}

void session::message_handler(std::string const &message) {

    if (message.starts_with("PUT ")) {
        on_put(message);
    } else if (message.starts_with("GET ")) {
        on_get(message);
    } else if (message.starts_with("DEL ")) {
        on_del(message);
    } else if (message.starts_with("COUNT ")) {
        on_count(message);
    } else if (message.starts_with("DUMP ")) {
        on_dump(message);
    } else {
        response_ = "NE";
    }

}

void session::on_put(std::string const &message) {
    std::istringstream ss(message);
    std::string tmp;
    std::string key;
    int cnt {};

    ss >> tmp;
    while (ss >> tmp) {
        if (++cnt == 3) {
            response_ = "NE\n";
            return;
        }
        if (cnt == 1) {
            if (!is_valid_key(tmp)) {
                response_ = "NE\n";
                return;
            }
            key = tmp;
        }
    }

    if (cnt != 2) { 
        response_ = "NE\n";
        return;
    }

    if (std::string value = get_from_storage(key); value != "") {
        response_ = "OK " + value + "\n";
    }
    storage_[key] = tmp;
}

void session::on_get(std::string const &message) {
    std::istringstream ss(message);
    std::string tmp;
    std::string value;
    int cnt {};

    ss >> tmp;
    while (ss >> tmp) {
        if (++cnt == 2) {
            response_ = "NE\n";
            return;
        }
        if (!is_valid_key(tmp)) {
            response_ = "NE\n";
            return;
        }
        value = get_from_storage(tmp);
    }

    if (cnt != 1 || value == "") { 
        response_ = "NE\n";
        return;
    }

    response_ = "OK " + value + "\n";
}

void session::on_del(std::string const &message) {
    std::istringstream ss(message);
    std::string tmp;
    std::string value;
    int cnt {};

    ss >> tmp;
    while (ss >> tmp) {
        if (++cnt == 2) {
            response_ = "NE\n";
            return;
        }
        if (!is_valid_key(tmp)) {
            response_ = "NE\n";
            return;
        }
        value = get_from_storage(tmp);
    }

    if (cnt != 1 || value == "") { 
        response_ = "NE\n";
        return;
    }

    storage_.erase(tmp);
    response_ = "OK " + value + "\n";
}

void session::on_count(std::string const &message) {
    std::istringstream ss(message);
    std::string tmp;

    ss >> tmp;
    while (ss >> tmp) {
        response_ = "NE\n";
        return;
    }

    size_t cnt {storage_.size()};
    response_ = "OK " + std::to_string(cnt) + "\n";
}

void session::on_dump(std::string const &message) {
    std::istringstream ss(message);
    std::string tmp;
    int cnt {};

    ss >> tmp;
    while (ss >> tmp) {
        if (++cnt == 2) {
            response_ = "NE\n";
            return;
        }
    }

    if (cnt != 1 || !is_valid_txt(tmp)) { 
        response_ = "NE\n";
        return;
    }

    // dumping
    response_ = "OK\n";
}

bool session::is_valid_key(std::string const &key) {
    return std::regex_match(key, key_regex_);
}

bool session::is_valid_txt(std::string const &fname) {
    return std::regex_match(fname, txt_regex_);
}

std::string session::get_from_storage(std::string const &key) {
    if (auto it = storage_.find(key); it != storage_.end()) {
        return it->second;
    }
    return "";
}

void session::put_from_storage(std::string const &key, std::string const &value) {
    storage_[key] = value;
}


/* server implemetation */

void server::async_accept() {
    acceptor_.async_accept([this](error_code err, tcp::socket socket) {
        if (!err) {
            std::make_shared<session>(std::move(socket), storage_)->start();
        }
    });
}

And I run this like that

// for server

int main(int argc, char* argv[]) {

    if (argc < 3 || argc > 4) {
        std::cout << "usage: server <port> <limit_connections> [<dump_file>]" << std::endl;
        return 1;
    }

    io::io_context io_context;
    server srv(io_context, static_cast<uint16_t>(std::stoi(argv[1])));
    srv.async_accept();
    io_context.run();
    return 0;
}

// for client

int main(int argc, char* argv[]) {
    if (argc != 3) {
        std::cout << "usage: client <ip> <port>" << std::endl;
        return 1;
    }

    io::io_context io_context;
    std::make_shared<client>(io_context, "127.0.0.1", std::stoi(argv[2]));
    io_context.run();

    return 0;
}

The server starts and listens, here is the netstat result:

tcp        0      0 0.0.0.0:8080            0.0.0.0:*               LISTEN      4707/./Server

But when I try to start the server, I get this error immediately after startup:

terminate called after throwing an instance of 'boost::wrapexcept<boost::system::system_error>'
  what():  close: Bad file descriptor [system:9]

Solution

  • A debugger tells you that the exception is thrown from close():

    enter image description here

    This means that the connection failed (for some other reason) and you then die because you try to close a socket that may not be open.

    Why Does Connect Fail?

    You cancel it! async_connect does not capture shared_from_this(). The client destructs, cancelling any operations. Your close() is called after object destuction: that's Undefined Behavior.

    Likely you already had shared_from_this, but you removed it. Why? If you didn't remove it, you would get a bad_weak_ptr exception. That's because shared_from_this cannot be used from the constructor: Why shared_from_this can't be used in constructor from technical standpoint?

    Changing:

    • start() to be separate from construction
    • boost::enable_shared_from_this to std::enable_shared_from_this (because you were using std::make_shared!!)
    • adding the missing self captures

    Now you only have the issue that std::getline blocks all IO.

    Other problems

    Your server has issues handling the buffer:

    std::stringstream message(buffer_);
    message << std::istream(&streambuf_).rdbuf();
    streambuf_.consume(bytes_transmitted);
    
    message_handler(buffer_);
    

    Here,buffer_ does not contain the request. Perhaps you are missing buffer_ = message.str()? May I suggest improved naming so it is less confusing to read this code? I'd probably consider something like:

    std::string message;
    if (getline(std::istream(&streambuf_), message))
        message_handler(message);
    

    The server also causes a write loop by calling more writes from on_write. Instead I'd move the new read to on_write to avoid handling overlapping requests (because you donot support it, because e.g. response_ is shared).

    DEMO

    With some other minor fixes, and just "living" with stuff like the required space after "COUNT ":

    • File client.cpp

       #include <boost/asio.hpp>
       #include <boost/enable_shared_from_this.hpp>
       #include <cstddef>
       #include <string>
      
       namespace io     = boost::asio;
       using tcp        = io::ip::tcp;
       using error_code = boost::system::error_code;
       using namespace std::placeholders;
      
       class client : public std::enable_shared_from_this<client> {
      
         public:
           client(io::io_context& io_context) : socket_(io_context) {}
      
           void start(tcp::endpoint ep);
      
         private:
           void on_connect(error_code const& err);
      
           void on_read(error_code const& err, size_t bytes_transmitted);
           void do_read();
      
           void on_write(error_code const& err);
           void do_write();
      
         private:
           tcp::socket   socket_;
           io::streambuf streambuf_;
           std::string   request_;
           std::string   response_;
       };
      
       #include <iostream>
      
       void client::start(tcp::endpoint ep) {
           socket_.async_connect(ep, [this, self = shared_from_this()](error_code err) { on_connect(err); });
       }
      
       void client::on_connect(error_code const& err) {
           std::cerr << "on_connect: " << err.message() << std::endl;
           if (!err) {
               do_write();
           } else {
               socket_.close();
           }
       }
      
       void client::on_read(error_code const& err, size_t /*bytes_transmitted*/) {
           std::cerr << "on_read: " << err.message() << std::endl;
           if (!err) {
               std::cout << &streambuf_ << std::endl;
               response_.clear();
      
               do_write();
           } else {
               socket_.close();
           }
       }
      
       void client::do_read() {
           auto self = shared_from_this();
           io::async_read_until(socket_, streambuf_, "\n", [self](error_code err, size_t bytes_transmitted) {
               self->on_read(err, bytes_transmitted);
           });
       }
      
       void client::on_write(error_code const& err) {
           std::cerr << "on_write: " << err.message() << std::endl;
           if (!err) {
               do_read();
           } else {
               socket_.close();
           }
       }
      
       void client::do_write() {
           std::getline(std::cin, request_);
           std::cout << "Sending " << quoted(request_) << std::endl;
           request_ += "\n";
           io::async_write(
               socket_, io::buffer(request_),
               [this, self = shared_from_this()](error_code err, size_t /*bytes_transmitted*/) { on_write(err); });
       }
      
       int main(int argc, char* argv[]) {
           if (argc != 3) {
               std::cout << "usage: client <ip> <port>" << std::endl;
               return 1;
           }
      
           io::io_context io_context;
      
           auto c = std::make_shared<client>(io_context);
           c->start(tcp::endpoint({}, std::stoi(argv[2])));
      
           io_context.run();
       }
      
    • File server.cpp

       #include <boost/asio.hpp>
       #include <boost/asio/io_context.hpp>
       #include <iostream>
       #include <memory>
       #include <regex>
       #include <string>
       #include <unordered_map>
       #include <utility>
      
       namespace io     = boost::asio;
       using tcp        = io::ip::tcp;
       using error_code = boost::system::error_code;
       using namespace std::placeholders;
      
       class session : public std::enable_shared_from_this<session> {
      
         public:
           session(tcp::socket socket, std::unordered_map<std::string, std::string>& storage)
               : socket_(std::move(socket))
               , storage_{storage} {}
      
           void start();
      
         private:
           void do_read();
           void on_read(error_code err, size_t bytes_transmitted);
      
           void do_write();
           void on_write(error_code err, size_t bytes_transmitted);
      
           void message_handler(std::string const& message);
           void on_put(std::string const& message);
           void on_get(std::string const& message);
           void on_del(std::string const& message);
           void on_count(std::string const& message);
           void on_dump(std::string const& message);
           bool is_valid_key(std::string const& key);
           bool is_valid_txt(std::string const& fname);
      
           std::string get_from_storage(std::string const& key);
           void        put_from_storage(std::string const& key, std::string const& value);
      
           tcp::socket   socket_;
           io::streambuf streambuf_;
           std::string   buffer_;
           std::string   response_;
      
           std::unordered_map<std::string, std::string>& storage_;
           static std::regex const                       key_regex_;
           static std::regex const                       txt_regex_;
       };
      
       class server {
      
         public:
           server(io::io_context& io_context, std::uint16_t port)
               : io_context_(io_context)
               , acceptor_(io_context_, tcp::endpoint(tcp::v4(), port)) {
               async_accept();
           }
      
           void async_accept();
      
         private:
           io::io_context& io_context_;
           tcp::acceptor   acceptor_;
      
           std::unordered_map<std::string, std::string> storage_;
       };
      
       #include <boost/asio.hpp>
       #include <cstddef>
       #include <functional>
       #include <memory>
       #include <regex>
       #include <sstream>
       #include <string>
       #include <utility>
      
       /* session implementation */
      
       std::regex const session::key_regex_("[A-Za-z0-9]+");
       std::regex const session::txt_regex_("[A-Za-z0-9]+\\.txt");
      
       void session::start() { do_read(); }
      
       void session::do_read() {
           io::async_read_until(socket_, streambuf_, "\n", std::bind(&session::on_read, shared_from_this(), _1, _2));
       }
      
       void session::on_read(error_code err, size_t /*bytes_transmitted*/) {
           std::cerr << "on_write: " << err.message() << std::endl;
           if (!err) {
               std::string message;
               if (getline(std::istream(&streambuf_), message)) {
                   message_handler(message);
                   do_write();
                   buffer_.clear();
               }
           } else {
               socket_.close();
           }
       }
      
       void session::do_write() {
           std::cerr << "Writing response: " << quoted(response_) << std::endl;
           io::async_write(socket_, io::buffer(response_),
                           std::bind(&session::on_write, shared_from_this(), _1, _2));
       }
      
       void session::on_write(error_code err, size_t /*bytes_transmitted*/) {
           std::cerr << "on_write: " << err.message() << std::endl;
           if (!err) {
               response_.clear();
               do_read();
           } else {
               socket_.close();
           }
       }
      
       void session::message_handler(std::string const& message) {
           std::cerr << "message_handler: " << quoted(message) << std::endl;
           if (message.starts_with("PUT ")) {
               on_put(message);
           } else if (message.starts_with("GET ")) {
               on_get(message);
           } else if (message.starts_with("DEL ")) {
               on_del(message);
           } else if (message.starts_with("COUNT ")) {
               on_count(message);
           } else if (message.starts_with("DUMP ")) {
               on_dump(message);
           } else {
               response_ = "NE";
           }
       }
      
       void session::on_put(std::string const& message) {
           std::istringstream ss(message);
           std::string        tmp;
           std::string        key;
           int                cnt{};
      
           ss >> tmp;
           while (ss >> tmp) {
               if (++cnt == 3) {
                   response_ = "NE\n";
                   return;
               }
               if (cnt == 1) {
                   if (!is_valid_key(tmp)) {
                       response_ = "NE\n";
                       return;
                   }
                   key = tmp;
               }
           }
      
           if (cnt != 2) {
               response_ = "NE\n";
               return;
           }
      
           if (std::string value = get_from_storage(key); value != "") {
               response_ = "OK " + value + "\n";
           } else {
               response_ = "OK\n";
           }
           storage_[key] = tmp;
       }
      
       void session::on_get(std::string const& message) {
           std::istringstream ss(message);
           std::string        tmp;
           std::string        value;
           int                cnt{};
      
           ss >> tmp;
           while (ss >> tmp) {
               if (++cnt == 2) {
                   response_ = "NE\n";
                   return;
               }
               if (!is_valid_key(tmp)) {
                   response_ = "NE\n";
                   return;
               }
               value = get_from_storage(tmp);
           }
      
           if (cnt != 1 || value == "") {
               response_ = "NE\n";
               return;
           }
      
           response_ = "OK " + value + "\n";
       }
      
       void session::on_del(std::string const& message) {
           std::istringstream ss(message);
           std::string        tmp;
           std::string        value;
           int                cnt{};
      
           ss >> tmp;
           while (ss >> tmp) {
               if (++cnt == 2) {
                   response_ = "NE\n";
                   return;
               }
               if (!is_valid_key(tmp)) {
                   response_ = "NE\n";
                   return;
               }
               value = get_from_storage(tmp);
           }
      
           if (cnt != 1 || value == "") {
               response_ = "NE\n";
               return;
           }
      
           storage_.erase(tmp);
           response_ = "OK " + value + "\n";
       }
      
       void session::on_count(std::string const& message) {
           std::istringstream ss(message);
           std::string        tmp;
      
           ss >> tmp;
           while (ss >> tmp) {
               response_ = "NE\n";
               return;
           }
      
           size_t cnt{storage_.size()};
           response_ = "OK " + std::to_string(cnt) + "\n";
       }
      
       void session::on_dump(std::string const& message) {
           std::istringstream ss(message);
           std::string        tmp;
           int                cnt{};
      
           ss >> tmp;
           while (ss >> tmp) {
               if (++cnt == 2) {
                   response_ = "NE\n";
                   return;
               }
           }
      
           if (cnt != 1 || !is_valid_txt(tmp)) {
               response_ = "NE\n";
               return;
           }
      
           // dumping
           response_ = "OK\n";
       }
      
       bool session::is_valid_key(std::string const& key) { return std::regex_match(key, key_regex_); }
      
       bool session::is_valid_txt(std::string const& fname) { return std::regex_match(fname, txt_regex_); }
      
       std::string session::get_from_storage(std::string const& key) {
           if (auto it = storage_.find(key); it != storage_.end()) {
               return it->second;
           }
           return "";
       }
      
       void session::put_from_storage(std::string const& key, std::string const& value) { storage_[key] = value; }
      
       /* server implemetation */
      
       void server::async_accept() {
           acceptor_.async_accept([this](error_code err, tcp::socket socket) {
               if (!err) {
                   std::make_shared<session>(std::move(socket), storage_)->start();
               }
           });
       }
      
       int main(int argc, char* argv[]) {
           if (argc < 3 || argc > 4) {
               std::cout << "usage: server <port> <limit_connections> [<dump_file>]" << std::endl;
               return 1;
           }
      
           io::io_context io_context;
           server         srv(io_context, static_cast<uint16_t>(std::stoi(argv[1])));
           srv.async_accept();
           io_context.run();
       }
      

    And proof of the pudding: