Search code examples
c++boost-asioboost-coroutine

ASIO - How to stop simple coroutine based server?


I have the following simple coroutine-based server:

class Server
{
private:
  boost::asio::io_service        Service;
  boost::asio::ip::tcp::acceptor Acceptor;
  boost::asio::ip::tcp::socket   Socket;

private:
  void Accept(boost::asio::yield_context Yield);
  void Write(boost::asio::yield_context Yield);

public:
       Server(): Acceptor(Service), Socket(Service) {}
  void Open(unsigned short PortNum);
  void Run();
  void Stop();
};

void Server::Accept(boost::asio::yield_context Yield)
{
boost::system::error_code ec;

for (;;)
  {
  Socket.close();
  Acceptor.async_accept(Socket,Yield[ec]);
  spawn(Yield,std::bind(&Server::Write,this,Yield[ec]));
  }
}

void Server::Write(boost::asio::yield_context Yield)
{
char InBuffer[1024]= {};
std::size_t Size;
boost::system::error_code ec;
double Data= 6.66;

for (;;)
  {
  boost::asio::streambuf OutBuffer;
  std::ostream os(&OutBuffer);

  Size= Socket.async_read_some(boost::asio::buffer(InBuffer),Yield[ec]);
  if (ec)
    break;
  os.write(reinterpret_cast<const char *>(&Data),sizeof(double));
  Socket.async_write_some(OutBuffer.data(),Yield[ec]);
  if (ec)
    break;
  }
}

void Server::Open(unsigned short PortNum)
{
Acceptor.open(boost::asio::ip::tcp::v4());
Acceptor.bind({{},PortNum});
Acceptor.listen();
}

void Server::Run()
{
spawn(Service,std::bind(&Server::Accept,this,std::placeholders::_1));
Service.run();
}

void Server::Stop()
{
Service.stop();
}

I want to run this server on a thread and stop it cleanly when the main program is about to finish:

int main()
{
Server s;

s.Open(1024);

std::thread Thread(&Server::Run,&s);

Sleep(10'000);
s.Stop();
Thread.join();
}

Unfortunately, if there is a connected socket, when I call Stop an exception boost::coroutines::detail::forced_unwind is thrown.

I have also tried creating an explicit strand and dispatching a Socket.close() before stopping with the same result.

Is there something wrong with this approach?


Solution

  • I’m having trouble trying to stop gracefully a similar server ( stackoverflow.com/questions/50833730/…). – metalfox 4 hours ago

    Here's a minimal change that shows how to handle

    • an Exit command that closes a session
    • a Shutdown command that closes the server (so it stops accepting connections and terminates after the last session exits)

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    
    using boost::asio::ip::tcp;
    using boost::system::error_code;
    using boost::asio::streambuf;
    
    int main() {
        boost::asio::io_service svc;
    
        tcp::acceptor a(svc);
        a.open(tcp::v4());
        a.set_option(tcp::acceptor::reuse_address(true));
        a.bind({{}, 6767}); // bind to port 6767 on localhost
        a.listen(5);
    
        using session = std::shared_ptr<tcp::socket>;
    
        std::function<void()>        do_accept;
        std::function<void(session)> do_session;
    
        do_session = [&](session s) {
            // do a read
            auto buf = std::make_shared<boost::asio::streambuf>();
            async_read_until(*s, *buf, "\n", [&,s,buf](error_code ec, size_t /*bytes*/) {
                if (ec)
                    std::cerr << "read failed: " << ec.message() << "\n";
                else {
                    std::istream is(buf.get());
                    std::string line;
                    while (getline(is, line)) // FIXME being sloppy with partially read lines
                    {
                        async_write(*s, boost::asio::buffer("Ack\n", 4), [&,s,buf](error_code ec, size_t) {
                            if (ec) std::cerr << "write failed: " << ec.message() << "\n";
                        });
                        if (line == "Exit") {
                            std::cout << "Exit received\n";
                            return;
                        }
                        if (line == "Shutdown") {
                            std::cout << "Server shutdown requested\n";
                            a.close();
                            return;
                        }
                    }
    
                    do_session(s); // full duplex, can read while writing, using a second buffer
                }
    
            });
        };
    
        do_accept = [&] {
            auto s = std::make_shared<session::element_type>(svc);
    
            a.async_accept(*s, [&,s](error_code ec) {
                if (ec)
                    std::cerr << "accept failed: " << ec.message() << "\n";
                else {
                    do_session(s);
                    do_accept(); // accept the next
                }
            });
        };
    
        do_accept(); // kick-off
        svc.run();   // wait for shutdown (Ctrl-C or failure)
    }
    

    Note the sample sessions

    echo -en "hello world\nExit\n"     | netcat 127.0.0.1 6767
    echo -en "hello world\nShutdown\n" | netcat 127.0.0.1 6767
    

    Printing

    Ack
    Ack
    Ack
    Ack
    Exit received
    Server shutdown requested
    accept failed: Operation canceled
    

    A Terminate Command

    If you want a "Terminate" command that actively closes all open sessions and shuts down the server, you'll have to

    • keep a list of sessions
    • or use signal

    You can see code for both approaches here: Boost ASIO: Send message to all connected clients

    The simplest way to integrate with the current sample:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <list>
    
    using boost::asio::ip::tcp;
    using boost::system::error_code;
    using boost::asio::streambuf;
    
    int main() {
        boost::asio::io_service svc;
    
        tcp::acceptor a(svc);
        a.open(tcp::v4());
        a.set_option(tcp::acceptor::reuse_address(true));
        a.bind({{}, 6767}); // bind to port 6767 on localhost
        a.listen(5);
    
        using session = std::shared_ptr<tcp::socket>;
        using sessref = std::weak_ptr<tcp::socket>;
    
        std::function<void()>        do_accept;
        std::function<void(session)> do_session;
        std::list<sessref> session_list;
    
        auto garbage_collect_sessions = [&session_list] {
            session_list.remove_if(std::mem_fn(&sessref::expired));
        };
    
        do_session = [&](session s) {
            // do a read
            auto buf = std::make_shared<boost::asio::streambuf>();
            async_read_until(*s, *buf, "\n", [&,s,buf](error_code ec, size_t /*bytes*/) {
                if (ec)
                    std::cerr << "read failed: " << ec.message() << "\n";
                else {
                    std::istream is(buf.get());
                    std::string line;
                    while (getline(is, line)) // FIXME being sloppy with partially read lines
                    {
                        async_write(*s, boost::asio::buffer("Ack\n", 4), [&,s,buf](error_code ec, size_t) {
                            if (ec) std::cerr << "write failed: " << ec.message() << "\n";
                        });
                        if (line == "Exit") {
                            std::cout << "Exit received\n";
                            return;
                        }
                        if (line == "Shutdown") {
                            std::cout << "Server shutdown requested\n";
                            a.close();
                            return;
                        }
                        if (line == "Terminate") {
                            std::cout << "Server termination requested\n";
                            a.close();
                            for (auto wp : session_list) {
                                if (auto session = wp.lock())
                                    session->close();
                            }
                            return;
                        }
                    }
    
                    do_session(s); // full duplex, can read while writing, using a second buffer
                }
    
            });
        };
    
        do_accept = [&] {
            auto s = std::make_shared<session::element_type>(svc);
    
            a.async_accept(*s, [&,s](error_code ec) {
                if (ec)
                    std::cerr << "accept failed: " << ec.message() << "\n";
                else {
                    garbage_collect_sessions();
    
                    session_list.push_back(s);
                    do_session(s);
                    do_accept(); // accept the next
                }
            });
        };
    
        do_accept(); // kick-off
        svc.run();   // wait for shutdown (Ctrl-C or failure)
    }
    

    Which obvioiusly uses a session_list to implement the "Terminate" command:

    if (line == "Terminate") {
        std::cout << "Server termination requested\n";
        a.close();
        for (auto wp : session_list) {
            if (auto session = wp.lock())
                session->close();
        }
        return;
    }