Search code examples
c++asynchronousnetworkingboost-asio

async_read_some() operation not getting run even when socket has received data?



int main(int argc, char *argv[])
{   

    int acceptingCounter = 0;

    std::array<char, 2048> message_buffer;

    boost::asio::io_context io_context;

    std::vector<tcp::endpoint * > active_connections;
    std::vector< tcp::socket * > mySocks;

    //Create a tcp::acceptor object to listen for new connections.
    //Initialised to listen on port 13.

    tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 1025));
    bool reading = false;

    std::cout << "Accepting..." << std::endl;

    while (true)
    {

        if (acceptingCounter < 1){ 

            tcp::socket * sock = new tcp::socket(io_context);
            tcp::endpoint * endpoint = new tcp::endpoint;

            // mySocks.push_back(sock);
            acceptor.async_accept(*sock,*endpoint,[endpoint,&active_connections, &acceptingCounter, &mySocks, sock](boost::system::error_code e)
            {
                active_connections.push_back(endpoint);
                mySocks.push_back(sock);
                
                std::cout << "Client newly accepted on: " << *endpoint << std::endl;
            });
            acceptingCounter++;
        }

        if (active_connections.size() > 0)
        {
            if (!reading){
                for (tcp::socket * s : mySocks){
                     reading = true;
                     s->async_read_some(boost::asio::buffer(message_buffer), [&message_buffer,s,&reading, mySocks](const boost::system::error_code &ec, std::size_t bytes_transferred)
                     {
                          //some completion code
                       reading = false;
                     }
                }
            }
        }
        

        io_context.poll();
        io_context.restart();

        //delete all endpoints and delete all sockets.
     
    }
    return 0;
}

So I've got a server script here and I can't seem to figure out why the async_read_some completion code never gets run. As I get my client to successfully send data to this socket and io_context.poll() or even using run() never seems to run the completion code or the asynchronous operation is never seen as "completed" as I understand so it's never run. Why exactly is it never seen as completed, the socket object doesn't go out of scope and even when using a blocking read() on the socket I'm unable to get past it and actually see the received data? Note that I do have the completion lambda function actually print it's not just commented out as shown here but nothing gets shown. The client definitely sends data because I get no error codes and the completion handler for async_send() is run on that side. I also checked that it is accepting and trying to read on the same socket by looking at the remote endpoint but it doesn't work.


Solution

  • So, there's many issues with your code. The main one seems to be that it's working.

    I made it self-contained:

    Live On Coliru

    #include <array>
    #include <boost/asio.hpp>
    #include <iostream>
    using boost::asio::ip::tcp;
    using boost::system::error_code;
    
    int main() {
        int acceptingCounter = 0;
    
        std::array<char, 2048> message_buffer;
    
        boost::asio::io_context io_context;
    
        std::vector<tcp::endpoint*> active_connections;
        std::vector<tcp::socket*>   mySocks;
    
        // Create a tcp::acceptor object to listen for new connections.
        // Initialised to listen on port 13.
    
        tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 1025));
        bool          reading = false;
    
        std::cout << "Accepting..." << std::endl;
    
        while (true) {
            std::cout << "Active connections: " << active_connections.size() << std::endl;
            std::cout << "Accepting counter: " << acceptingCounter << std::endl;
            if (acceptingCounter < 1) {
    
                tcp::socket*   sock     = new tcp::socket(io_context);
                tcp::endpoint* endpoint = new tcp::endpoint;
    
                // mySocks.push_back(sock);
                acceptor.async_accept( //
                    *sock, *endpoint,
                    [endpoint, &active_connections /*, &acceptingCounter*/, &mySocks, sock](error_code e) {
                        std::cout << "Client newly accepted on: " << *endpoint << " (" << e.message()
                                  << std::endl;
                        active_connections.push_back(endpoint);
                        mySocks.push_back(sock);
                    });
                acceptingCounter++;
            }
    
            if (active_connections.size() > 0) {
                if (!reading) {
                    for (tcp::socket* s : mySocks) {
                        reading = true;
                        s->async_read_some(
                            boost::asio::buffer(message_buffer),
                            [/*&message_buffer, s,*/ &reading, mySocks](error_code ec, size_t transferred) {
                                std::cout << "Received: " << transferred << " bytes (" << ec.message() << ")"
                                          << std::endl;
                                // some completion code
    
                                reading = false;
                            });
                    }
                }
            }
    
            io_context.run();
            io_context.restart();
            // delete all endpoints and delete all sockets.
        }
    }
    

    With the live demo on Coliru, or locally:

    Problems

    You have memory leaks, race conditions, unnecessary dynamic allocation, using raw new/delete but also

    • the accept loop never accepts a new connection
    • the whole concept of asio is to abstract the event loop, you're hardcoding it with a gigantic amount of state as if you're writing C-style epoll/select code. This means that you cannot even have two reads on separate connections in parallel. That's antithetical to the purpose of asynchronous IO.

    Here's my suggestion to fix it:

    #include <array>
    #include <boost/asio.hpp>
    #include <iostream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    
    struct Session : std::enable_shared_from_this<Session> {
        tcp::socket            sock;
        tcp::endpoint          ep = sock.remote_endpoint();
        std::array<char, 2048> message_buffer;
    
        Session(tcp::socket sock) : sock(std::move(sock)) { std::cout << "New session: " << ep << std::endl; }
        ~Session() { std::cout << "Close session: " << ep << std::endl; }
    
        void start() { do_read(); }
    
        void do_read() {
            sock.async_read_some(asio::buffer(message_buffer),
                                 [this, self = shared_from_this()](error_code ec, size_t transferred) {
                                     std::cout << ep << ": Received: " << transferred << " bytes ("
                                               << ec.message() << ")" << std::endl;
                                     // some completion code
    
                                     do_read();
                                 });
        }
    };
    
    
    int main() {
        asio::io_context io_context;
        tcp::acceptor    acceptor(io_context, {{}, 1025});
    
        std::cout << "Accepting..." << std::endl;
    
        std::function<void()> accept_loop;
    
        accept_loop = [&acceptor, &accept_loop]() {
            acceptor.async_accept([&](error_code ec, tcp::socket s) {
                std::cout << "Accepted (" << ec.message() << "): " << (ec ? tcp::endpoint{} : s.remote_endpoint())
                          << std::endl;
    
                if (!ec) {
                    std::make_shared<Session>(std::move(s))->start();
                    accept_loop();
                }
            });
        };
    
        accept_loop();
    
        io_context.run();
    }
    

    Which does support multiple concurrent connections without the problems. Note there's a single run() and everything happens indirectly from the accept_loop: