Search code examples
c++asynchronousboost-asioasio

Asio. Error: "The I/o operation has been aborted because of either a thread exit or an application requests"


I am trying to create an async server using asio but when the acceptor calls async_accept function, I get this error The I/o operation has been aborted because of either a thread exit or an application requests, which doesn't let the process continue. I have tried to change the port but it doesn't work. When I try to make a sync server so it works. Also I use asio as a separate lib from boost.

Here's the function where I get the error:

void startAsyncAccept()
    {
        acceptor.async_accept(socket, [&](const asio::error_code& error)
            {
                if(!error)
                {
                    std::cout << "Client is connected" << "\n";
                    startAsyncRead();
                }
                else
                {
                    
                    std::cerr << "It is: " << error.message() << '\n';
                    return 0;
                    
                }
                startAsyncAccept();


            }
        );
        
    }

Ans here's my async server:

class Server
{
public:
    Server(asio::io_context& io_context) : acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 2000)), socket(io_context)
    {
        startAsyncAccept();
    }
    ~Server() {}
private:
    // Members
    asio::ip::tcp::acceptor acceptor;
    asio::ip::tcp::socket socket;
    asio::streambuf buffer;
    std::vector<asio::ip::tcp::socket> sockets;

    // Functions
    void startAsyncAccept()
    {
        acceptor.async_accept(socket, [&](const asio::error_code& error)
            {
                if(!error)
                {
                    std::cout << "Client is connected" << "\n";
                    startAsyncRead();
                }
                else
                {
                    
                    std::cerr << "It is: " << error.message() << '\n';
                    return 0;
                    
                }
                startAsyncAccept();


            }
        );
        
    }

    void startAsyncRead()
    {
        asio::async_read_until(socket, buffer, '\n', [&](const asio::error_code& error, size_t length)
            {
                if(!error)
                {
                    std::string message(asio::buffers_begin(buffer.data()), asio::buffers_begin(buffer.data()) + length);
                    std::cout << "Received from client: " << message;

                    for(auto& clients : sockets)
                    {
                        asio::async_write(clients, asio::buffer(message), [&](const asio::error_code& error, size_t length) 
                            {
                                if(error)
                                {
                                    std::cerr << "Failed to write to client: " << error.message() << "\n";
                                }
                            }
                        );
                    }
                    buffer.consume(length);
                    startAsyncRead();

                }
                else
                {
                    std::cout << "Client disconnected." << std::endl;
                    removeClient();
                }
            }
        );
    }

    void startAsyncWrite() 
    {
        asio::async_write(socket, asio::buffer("Connected to chat Server. \n"), [&](const asio::error_code& error, size_t length) 
            {
                if(error)
                {
                    std::cerr << "Failed to write to the client: " << error.message() << "\n";
                    removeClient();
                }
            }
        );
    }
    void removeClient()
    {
        auto it = std::find_if(sockets.begin(), sockets.end(),
            [&](const auto& client_socket)
            {
                return &client_socket == &socket;
            });

        if (it != sockets.end())
        {
            sockets.erase(it);
        }
    }


};

Solution

  • First of all, there's Undefined Behaviour because the lambda misses a return on some paths. This seems to be a copy-paste error of some sort, so I'll just remove the return 0; that was meaningless.

    The Real Problem

    The real problem is that you use the same socket on each accept. That means that you reset the socket that you started the first asyncRead on as soon as you call startAsyncAccept again. That causes the asio::error::operation_aborted error code.

    Simply don't use the same socket. Move it into a "session" object e.g. Or, as you seem to have wanted to do, put them in your collection of sockets.

    Fixing?

    Ironically, it is hard to know how you wanted to use sockets since it is literally unused in your current code (except for the loops that will never iterate, because sockets is always empty).

    What's worse, it's only good that didn't work, because otherwise you get more UB, because std::vector will reallocate to grow capacity, meaning that any references to existing sockets are invalidated.

    The usual way to deal with this is not to have a collection of sockets, but rather collection of (weak) shared pointers to sessions objects. For simplicity, let me fix the reference stability here by changing std::vector to std::list and removing the problematic socket variable.

    tcp::socket              socket;
    asio::streambuf          buffer;
    std::vector<tcp::socket> sockets;
    

    Becomes

    asio::streambuf        buffer;
    std::list<tcp::socket> sockets;
    

    There's big problems as well with async_write as

    • you use a local variable message as the buffer. This is UB again, because its lifetime ends before the write operations complete
    • you MUST avoid overlapping calls. Because you don't control when async_read_until completes (and you might have an arbitrary number of connected clients), you cannot be sure no writes overlap. The only solution there is to have an "outbox" queue - typically per client, for independent operation of each connection.

    As a sidenote, it's unclear what startAsyncWrite was supposed to do (it was unused, and doesn't help to solve the problem just described). Same with removeClient which would never do anything even if you had filled sockets, it would just cause UB due to the invalidation of socket references. I dropped those two for now.

    Finally, you use a streambuf, where it seems you might just use a dynamic string buffer directly. In any case, put the consume closer to the "consume" so that you don't risk missing it under some error condition.

    Simplified V1

    This simplified version removes the problems that you ask about as well as the lifetime of the message:

    Live On Coliru

    class Server {
      public:
        Server(asio::io_context& io_context) //
            : acceptor(io_context, {{}, 2000}) {
            startAsyncAccept();
        }
    
      private:
        // Members
        tcp::acceptor          acceptor;
        asio::streambuf        buffer;
        std::list<tcp::socket> sockets;
    
        void startAsyncAccept() {
            acceptor.async_accept([&](error_code error, tcp::socket accepted) {
                if (!error) {
                    auto& client = sockets.emplace_back(std::move(accepted));
                    std::cout << "Client is connected\n";
                    startAsyncRead(client);
                } else {
                    std::cerr << "It is: " << error.message() << "\n";
                }
                startAsyncAccept();
            });
        }
    
        void startAsyncRead(tcp::socket& client) {
            asio::async_read_until(client, buffer, '\n', [&](error_code error, size_t n) {
                if (!error) {
                    auto f = asio::buffers_begin(buffer.data()), l = f + static_cast<ptrdiff_t>(n);
                    auto message = std::make_shared<std::string>(f, l);
                    buffer.consume(n);
                    std::cout << "Received from client: " << message;
    
                    for (auto& peer : sockets) {
                        // SEHE: TODO FIXME serializing writes per peer!
                        asio::async_write(
                            peer, asio::buffer(*message), [&, message](error_code error, size_t /*length*/) {
                                if (error) {
                                    std::cerr << "Failed to write to client: " << error.message() << "\n";
                                }
                            });
                    }
                    startAsyncRead(client);
                } else {
                    std::cout << "Client disconnected." << std::endl;
                    // SEHE: TODO ERASE
                }
            });
        }
    };
    

    Fixed: Client Sessions

    Some more work to fix the other problems:

    [after breakfast]

    UPDATE You absolutely need a separate Session, because I missed earlier (before breakfast) that you were also falsely using the same streambuf instance for all connections... A Session type makes for a logical place to group the incoming buffer, outgoing buffer queue and socket.

    We use enable_shared_from_this together with shared_from_this() to get automatic lifetime management when a connection gets bad/closed.

    Note that it all comes down to correct Separation Of Concerns: The Server manages the listening, the ChatRoom manages connected clients, the Session manages a single client.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    #include <list>
    #ifndef STANDALONE_ASIO
        namespace asio = boost::asio;
        using boost::system::error_code;
    #else
        using asio::error_code;
    #endif
    using asio::ip::tcp;
    
    struct Session;
    using SessionPtr = std::shared_ptr<Session>;
    using Handle     = std::weak_ptr<Session>;
    
    struct ChatRoom {
        std::list<Handle> clients_;
    
        void garbage_collect();
        void broadcast(std::string_view message);
    
        // just for example:
        void direct_message(std::string recipient, std::string_view message);
    };
    
    struct Session : std::enable_shared_from_this<Session> {
        Session(tcp::socket s, ChatRoom& room) : socket_(std::move(s)), room_(room) {}
        void start() { readLoop(); }
    
        void send(std::string_view message){
            outbox_.emplace_back(message);
            if (outbox_.size() == 1)
                writeLoop();
        }
    
      private:
        tcp::socket             socket_;
        ChatRoom&               room_;
        asio::streambuf         incoming_;
        std::deque<std::string> outbox_;
    
        void readLoop() {
            asio::async_read_until(
                socket_, incoming_, '\n', [&, self = shared_from_this()](error_code ec, size_t n) {
                    if (ec) {
                        std::cout << "Client disconnect (" << ec.message() << ")" << std::endl;
                        return;
                    }
    
                    auto f = asio::buffers_begin(incoming_.data()), l = f + static_cast<ptrdiff_t>(n);
                    room_.broadcast(std::string(f, l));
                    incoming_.consume(n);
    
                    readLoop();
                });
        }
    
        void writeLoop() {
            if (outbox_.empty())
                return;
    
            asio::async_write( //
                socket_, asio::buffer(outbox_.front()),
                [this, self = shared_from_this()](error_code ec, size_t /*length*/) {
                    outbox_.pop_front();
                    if (ec)
                        std::cerr << "Failed to write to client: " << ec.message() << "\n";
                    else
                        writeLoop();
                });
        }
    };
    
    class Server {
      public:
        Server(asio::any_io_executor ex) : acceptor_(ex, {{}, 2000}) { acceptLoop(); }
    
      private:
        tcp::acceptor acceptor_;
        ChatRoom      room_;
    
        void acceptLoop() {
            room_.garbage_collect(); // optionally prevent dead connections piling up
    
            acceptor_.async_accept([&](error_code ec, tcp::socket accepted) {
                if (!ec) {
                    auto sess = std::make_shared<Session>(std::move(accepted), room_);
                    room_.clients_.push_back(sess);
                    std::cout << "Client is connected\n";
    
                    sess->start();
                } else {
                    std::cerr << "Accept error: " << ec.message() << "\n";
                }
                acceptLoop();
            });
        }
    };
    
    void ChatRoom::garbage_collect() {
        clients_.remove_if(std::mem_fn(&Handle::expired));
    }
    
    void ChatRoom::broadcast(std::string_view message) {
        for (auto& handle : clients_) {
            if (auto peer = handle.lock()) {
                peer->send(message);
            }
        }
    }
    
    using namespace std::chrono_literals;
    int main() {
        asio::io_context io;
        Server chat(io.get_executor());
    
        io.run_for(30s);
    }
    

    Testing with a number of clients:

    for a in {1..10}; do (
            sleep 1.$RANDOM
            echo "hello from $a"
            sleep 1.$RANDOM
            echo "bye from $a"
    ) | nc 127.0.0.1 2000 -w3 | (while read line; do echo "Client $a received '$line'"; done) & done
    

    Prints output like:

    enter image description here

    Also live on Coliru :)