Search code examples
c++network-programmingc++17boost-asioasio

async_write only sends after the server is closed


I'm trying to send messages via async_write, but they are only sent after I shutdown the server ( ctrl-c)

For example: as client I send "test" and "test2", and only after closing the server client recieves "testtest2"

I'm making a chat that accepts a message from a user (successfully) and has to broadcast it to everyone

message sending code

void Server::writeHandler(int id, boost::system::error_code error){
    if (!error){
        std::cout << "[DEBUG] message broadcasted ";
    } else {
        close_connection(id);
    }
}
void Server::broadcast(std::string msg, boost::system::error_code error){
    for (auto& user : m_users){ // for every user in unordered map of users
        asio::async_write(user.second->socket, asio::buffer(msg, msg.size()),
            std::bind(&Server::writeHandler, this, user.first, std::placeholders::_1));
    }
}

broadcast calls in onMessage

void Server::onMessage(int id, boost::system::error_code error){
    if (!error){
        broadcast(m_read_msg, error); // char m_read_msg[PACK_SIZE] // PACK_SIZE = 512
        asio::async_read(m_users[id].get()->socket, asio::buffer(m_read_msg, PACK_SIZE), // PACK_SIZE = 512
            std::bind(&Server::onMessage, this, id, std::placeholders::_1));
    } else {
        close_connection(id);
    }
}

server run function:

void Server::run(int port){
    asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
    std::cout << "[DEBUG] binded on " << port << std::endl;

    // acceptor initialization
    m_acceptor = std::make_shared<asio::ip::tcp::acceptor>(m_io, endpoint);
    
    // start to listen for connections
    listen();

    // run the io_service in thread
    io_thread = std::thread( [&]{ m_io.run(); } ); // m_io = io_serivce

    while (true){
    }
}

void Server::listen(){
    std::shared_ptr<User> pUser(new User(m_io));
    m_acceptor->async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
}

Solution

  • Infinite loops are UB in C++.

    Your std::string msg is a local (parameter) and passing it to an async operation (async_write) is also UB.

    To start a thread, immediately followed by an infinite loop is useless, unless the objective is to waste a lot of CPU power. So, replace it with just

     m_io.run();
    

    Next up, the code is far from self contained and lacks all manner of error handling. Here's my imaged minimal completion, fixing the UBs already mentioned:

    #include <boost/asio.hpp>
    #include <iostream>
    #include <map>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::placeholders;
    
    struct User {
        User(asio::io_service& io) : socket(io) {}
    
        tcp::socket socket;
    };
    
    struct Server {
        void run(uint16_t port) {
            m_acceptor = {m_io, {{}, port}};
            std::cout << "[DEBUG] bound on " << port << std::endl;
    
            listen();
    
            m_io.run();
        }
    
        void close_connection(int) {}
        std::map<int, std::shared_ptr<User>> m_users;
    
        void writeHandler(int id, error_code error, std::shared_ptr<void>) {
            if (!error) {
                std::cout << "[DEBUG] message broadcasted ";
            } else {
                close_connection(id);
            }
        }
    
        void broadcast(std::string msg, boost::system::error_code /*error*/) {
            for (auto& [id, usr] : m_users) { // for every user in unordered map of users
                asio::async_write(usr->socket, asio::buffer(*shared_msg),
                                  bind(&Server::writeHandler, this, id, _1, shared_msg));
            }
        }
    
        void listen() {
            std::shared_ptr<User> pUser(new User(m_io));
            m_acceptor.async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
        }
    
        void onAccept(std::shared_ptr<User> /*user*/) { listen(); }
    
        asio::io_service m_io;
        tcp::acceptor    m_acceptor{m_io};
    };
    
    int main() {
        Server s;
        s.run(8989);
    }
    

    Note that nothing ever broadcasts in the first place, so I'm not sure what the problem is.

    What I can see is that

    • io_service is deprecated (use io_context)
    • passing io_service& around is anti-pattern (use executors)
    • you dearly need to separate the concerns for Server and User. The handlers for each user session need to use the state from User and are much more naturally member of User. In fact, that's the only way to do it correctly because with a out-of-session broadcast you MUST guarantee the correct sequencing of writes, which means you MUST have a per-session (user) queue
    • close_connection is antipattern (IO is external "reality", you follow it, instead of the way around). Instead use enable_shared_from_this
    • When you do, make m_users store a weak_ptr so you can observe when connections terminate

    Here's what it would look like with those addressed:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    #include <map>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::placeholders;
    
    using Message = std::string;
    
    struct UserSession : std::enable_shared_from_this<UserSession> {
        UserSession(tcp::socket s, int /*id*/) : socket_(std::move(s)) {
            std::cout << "New UserSession from " << ep_ << std::endl;
        }
        ~UserSession() {
            std::cout << "Closed UserSession from " << ep_ << std::endl;
        }
    
        void start() {
            read_loop();
        }
    
        void send(Message msg) {
            outbox_.push_back(std::move(msg));
            if (outbox_.size()==1)
                write_loop();
        }
    
      private:
        tcp::socket         socket_;
        tcp::endpoint       ep_; // cache for use after socket_ becomes invalid
        Message             incoming_;
        std::deque<Message> outbox_;
    
        void read_loop() {
            async_read(socket_, asio::dynamic_buffer(incoming_),
                       [this, self = shared_from_this()](error_code ec, size_t) {
                           if (!ec) {
                               // TODO
                               read_loop();
                           }
                       });
        }
    
        void write_loop() { 
            if (outbox_.empty())
                return;
            async_write(socket_, asio::buffer(outbox_.front()),
                        [this, self = shared_from_this()](error_code ec, size_t) {
                            if (!ec) {
                                outbox_.pop_front();
                                write_loop();
                            }
                        });
        }
    };
    
    using Handle = std::weak_ptr<UserSession>;
    
    struct Server {
        explicit Server(asio::any_io_executor ex, uint16_t port) : m_acceptor{ex, {{}, port}} {
            std::cout << "[DEBUG] bound on " << m_acceptor.local_endpoint() << std::endl;
        }
    
        void listen() { accept_loop(); }
    
        void broadcast(Message msg) {
            auto shared_msg = std::make_shared<Message>(std::move(msg));
            for (auto& [id, handle] : m_users) // for every user in unordered map of users
                if (auto usr = handle.lock())
                    usr->send(msg);
        }
    
      private:
        int next_user_id = 0;
        std::map<int, Handle> m_users;
    
        void accept_loop() {
            m_acceptor.async_accept([this](error_code ec, tcp::socket s) {
                if (!ec) {
                    auto id  = next_user_id++;
                    auto usr = std::make_shared<UserSession>(std::move(s), id);
                    m_users.emplace(id, usr);
                    usr->start();
    
                    accept_loop();
                }
            });
        }
    
        tcp::acceptor m_acceptor;
    };
    
    int main() {
        asio::io_context ioc;
    
        Server s(ioc.get_executor(), 8989);
        s.listen();
    
        ioc.run();
    }
    

    Local demo:

    enter image description here

    BONUS: Chat Server?

    Of course, still nothing ever calls broadcast. Assuming you might actually want something like a chat-server:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    #include <map>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::placeholders;
    
    using Id = int;
    
    struct Message {
        Id          sender;
        std::string text;
    };
    
    struct IServer {
        virtual ~IServer() = default;
        virtual void broadcast(Message msg) = 0;
    };
    
    struct UserSession : std::enable_shared_from_this<UserSession> {
        UserSession(tcp::socket s, Id id, IServer& srv) : socket_(std::move(s)), id_(id), svr_(srv) {
            std::cout << "New UserSession from " << ep_ << std::endl;
        }
        ~UserSession() {
            std::cout << "Closed UserSession from " << ep_ << std::endl;
        }
    
        void start() {
            read_loop();
        }
    
        void send(Message msg) {
            outbox_.push_back(std::move(msg));
            if (outbox_.size()==1)
                write_loop();
        }
    
        Id get_id() const { return id_; }
    
      private:
        tcp::socket         socket_;
        Id                  id_;
        IServer&            svr_;
        tcp::endpoint       ep_; // cache for use after socket_ becomes invalid
        Message             incoming_;
        std::deque<Message> outbox_;
    
        void read_loop() {
            incoming_.sender = id_;
            async_read_until( //
                socket_, asio::dynamic_buffer(incoming_.text), "\n",
                [this, self = shared_from_this()](error_code ec, size_t n) {
                    std::cout << ep_ << ": Read " << ec.message() << "\n";
                    if (!ec) {
                        svr_.broadcast(std::move(incoming_));
                        incoming_.text.erase(0, n);
                        read_loop();
                    }
                });
        }
    
        void write_loop() { 
            if (outbox_.empty())
                return;
            async_write(socket_, asio::buffer(outbox_.front().text),
                        [this, self = shared_from_this()](error_code ec, size_t) {
                            if (!ec) {
                                outbox_.pop_front();
                                write_loop();
                            }
                        });
        }
    };
    
    using Handle = std::weak_ptr<UserSession>;
    
    struct Server : IServer {
        explicit Server(asio::any_io_executor ex, uint16_t port) : m_acceptor{ex, {{}, port}} {
            std::cout << "[DEBUG] bound on " << m_acceptor.local_endpoint() << std::endl;
        }
    
        void listen() { accept_loop(); }
    
        virtual void broadcast(Message msg) override {
            m_history.push_back(msg);
            while (m_history.size() > MAX_HIST)
                m_history.pop_front();
    
            for (auto& [id, handle] : m_users) // for every user in unordered map of users
                if (auto usr = handle.lock())
                    if (usr->get_id() != msg.sender)
                        usr->send(msg);
        }
    
      private:
        Id                   next_user_id = 0;
        std::map<Id, Handle> m_users;
        static size_t constexpr MAX_HIST = 100;
        std::deque<Message>  m_history;
    
        void accept_loop() {
            m_acceptor.async_accept([this](error_code ec, tcp::socket s) {
                if (!ec) {
                    erase_if(m_users, [](auto& pair) { return pair.second.expired(); }); // garbage collect
    
                    auto id  = next_user_id++;
                    auto usr = std::make_shared<UserSession>(std::move(s), id, *this);
                    m_users.emplace(id, usr);
    
                    for (auto& msg : m_history)
                        usr->send(msg);
                    broadcast({-1, "** Now " + std::to_string(m_users.size()) + " users online **\n"});
                    usr->start();
    
                    accept_loop();
                }
            });
        }
    
        tcp::acceptor m_acceptor;
    };
    
    int main() {
        asio::io_context ioc;
    
        Server s(ioc.get_executor(), 8989);
        s.listen();
    
        ioc.run();
    }
    

    With a local demo again:

    enter image description here