Search code examples
c++boost-asioc++20c++-coroutineboost-coroutine

Using co simultaneously in a C+20 environment_ await async_ Read and | | async_ Wait will cause serious errors, with test Dome attached


  1. Will generate exceptions that cannot be caught
  2.   boost::asio::detail::cancellation_handler_base::call[virtual] == 0xFFFFFFFFFFFFFFFF。
        /// Emits the signal and causes invocation of the slot's handler, if any.
        void emit(cancellation_type_t type)
        {
          if (handler_)
            handler_->call(type);
        }
    

I can't find where the problem is anymore, the exception cannot be caught

ServerDome:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <cstdio>

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using boost::asio::thread_pool;
using std::chrono::steady_clock;
using boost::asio::as_tuple;
using boost::asio::buffer;
using boost::asio::experimental::channel;
using boost::asio::io_context;
using boost::asio::steady_timer;

namespace this_coro = boost::asio::this_coro;
using ssl_socket = boost::asio::ssl::stream<boost::asio::ip::tcp::socket>;
constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable);
using namespace std::literals::chrono_literals;
using namespace boost::asio::experimental::awaitable_operators;

awaitable<void> echo(tcp::socket socket)
{
    steady_timer timer(co_await this_coro::executor);
    try
    {
        char data[2000];
        for (;;)
        {
            timer.expires_after(2ms);
            auto result1 = co_await(async_read(socket, buffer(data, 2000), use_nothrow_awaitable) || timer.async_wait(use_nothrow_awaitable));
            if (result1.index() == 1) {
                //std::cout << "time out." << std::endl;
            }
            else {
                auto [e,n] = std::get<0>(result1);
                if (!e)
                {
                    if (n)
                        co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
                }
                else {
                    std::cout << e.message() << std::endl;
                }
            }

        }
    }
    catch (std::exception& e)
    {
        std::printf("echo Exception: %s\n", e.what());
    }
}

awaitable<void> listener()
{
    auto executor = co_await this_coro::executor;
    tcp::acceptor acceptor(executor, { tcp::v4(), 5555 });
    for (;;)
    {
        tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
        co_spawn(executor, echo(std::move(socket)), detached);
    }
}

int main(int argc, char* argv[])
{
    try
    {
        thread_pool pol(8);
        boost::asio::signal_set signals(pol, SIGINT, SIGTERM);
        signals.async_wait([&](auto, auto) { pol.stop(); });

        co_spawn(pol, listener(), detached);

        pol.wait();
    }
    catch (std::exception& e)
    {
        std::printf("Exception: %s\n", e.what());
    }
}

ClientDome:

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <cstdio>

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using std::chrono::steady_clock;
using boost::asio::thread_pool;
using boost::asio::as_tuple;
using boost::asio::buffer;
using boost::asio::experimental::channel;
using boost::asio::io_context;

namespace this_coro = boost::asio::this_coro;
constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable);
using namespace boost::asio::experimental::awaitable_operators;
using namespace std::literals::chrono_literals;
thread_pool io_context(3);

awaitable<void> timeout(steady_clock::duration duration)
{
    steady_timer timer(co_await this_coro::executor);
    timer.expires_after(duration);
    co_await timer.async_wait(use_nothrow_awaitable);
}

awaitable<void> echo3(tcp::socket socket)
{
    try
    {
        char data[4024];
        for (;;)
        {
            co_await timeout(std::chrono::milliseconds(2));
            co_await async_write(socket, boost::asio::buffer(data, 2000), use_awaitable);
        }
    }
    catch (const std::exception& e)
    {
        std::printf("echo2 Exception: %s\n", e.what());
    }
}

awaitable<void> listener(tcp::socket socket)
{
    try
    {
        auto executor = co_await this_coro::executor;

        auto listen_endpoint =
            *tcp::resolver(socket.get_executor()).resolve("127.0.0.1", std::to_string(5555),
                tcp::resolver::passive).begin();
        co_await socket.async_connect(listen_endpoint, use_awaitable);
        co_spawn(executor, echo3(std::move(socket)), detached);
    }
    catch (const std::exception& e)
    {
        std::printf("connect Exception: %s\n", e.what());
    }
}

int main()
{
    try
    {
        
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&](auto, auto) { io_context.stop(); });

        for (size_t i = 0; i < 3; i++)
        {
            co_spawn(io_context, listener(std::move(tcp::socket(io_context))), detached);
        }

        io_context.wait();
    }
    catch (std::exception& e)
    {
        std::printf("main Exception: %s\n", e.what());
    }
}

I expect the program to run without errors and exceptions


Solution

  • There are a two main things:

    Strands

    I noticed sparsely occuring SEGV in deadline_timer_service. I noticed that you have a multi-threaded server context,but didn't guard the connections (echo sessions with strand). To be completely honest, I didn't intuitively think you would require one, but I'm thinking that the parallel_group that implements the || awaitable operator uses a shared cancellation mechanism that may not be threadsafe. In that case, a strand might be required:

    asio::awaitable<void> listener() {
        for (tcp::acceptor acc(co_await this_coro::executor, {{}, 5555});;) {
            auto s = make_strand(acc.get_executor());
            co_spawn(                                              //
                s,                                                 //
                echo(co_await acc.async_accept(s, use_awaitable)), //
                asio::detached);
        }
    }
    

    With that the SEGV goes away.

    Deadlock

    I noticed client and server stop progressing after a while. After counting lines I noticed it would freeze up after the same amount of messages transferred each time: ~7800x2000 bytes. That's about 5GiB per socket.

    I figured that the kernel might just block writes when the read buffer is at maximum size, because the client never reads the echoed packets in the client.

    So, I added that. The neat thing to do would be to co_spawn a full-duplex coroutine, but since we know that all packets are echoed 1:1, I did the simple thing and put the read inline with the write loop.

    Effectively we replace the sleep(2ms) with a "read indefinitely" (here it's important that the data buffer has a higher capacity than the maximum expected reply) limited to 2ms:

    co_await (async_read(socket, asio::buffer(data), use_awaitable) || timeout(2ms));
    

    Side note: now that we used parallel groups in the client code as well, I made sure to upgrade to strand executors for the coro as well.

    Combined Test Program

    With that change, all is well, and the server/client kept running without failures, tripping UBSan/ASan or deadlocking.

    This program (potentially) combines client and server, time limited to 30s only for COLIRU. It logs frequent messages once-in-a-thousand times:

    Live On Coliru ²

    #include <boost/asio.hpp>
    #include <boost/asio/experimental/awaitable_operators.hpp>
    #include <iostream>
    #include <set>
    
    namespace asio = boost::asio;
    using asio::use_awaitable;
    using asio::ip::tcp;
    
    namespace this_coro          = asio::this_coro;
    constexpr auto nothrow_await = asio::as_tuple(use_awaitable);
    using namespace asio::experimental::awaitable_operators;
    using namespace std::literals::chrono_literals;
    
    void once_in(size_t n, auto&& action) { // helper for reduced frequency logging
        static std::atomic_size_t counter_ = 0;
        if ((++counter_ % n) == 0) {
            if constexpr (std::is_invocable_v<decltype(action), size_t>)
                std::move(action)(counter_);
            else
                std::move(action)();
        }
    }
    
    asio::awaitable<void> timeout(auto duration) {
        asio::steady_timer timer(co_await this_coro::executor);
        timer.expires_after(duration);
        co_await timer.async_wait(nothrow_await);
    }
    
    asio::awaitable<void> server_session(tcp::socket socket) {
        try {
            for (std::array<char, 2000> data;;) {
                if (auto r = co_await (async_read(socket, asio::buffer(data), nothrow_await) || timeout(2ms));
                    r.index() == 1) {
                    once_in(1000, [&] { std::cout << "server_session time out." << std::endl; });
                } else {
                    auto [e, n] = std::get<0>(r);
                    if (!e) {
                        once_in(1000, [&, n = n] {
                            std::cout << "server_session writing " << n << " bytes to "
                                      << socket.remote_endpoint() << std::endl;
                        });
                        if (n)
                            co_await async_write(socket, asio::buffer(data, n), use_awaitable);
                    } else {
                        std::cout << e.message() << std::endl;
                    }
                }
            }
        } catch (boost::system::system_error const& se) {
            std::cout << "server_session Exception: " << se.code().message() << std::endl;
        } catch (std::exception const& e) {
            std::cout << "server_session Exception: " << e.what() << std::endl;
        }
    
        std::cout << "server_session closed" << std::endl;
    }
    
    asio::awaitable<void> listener(uint16_t port) {
        for (tcp::acceptor acc(co_await this_coro::executor, {{}, port});;) {
            auto s = make_strand(acc.get_executor());
            co_spawn(                                                        //
                s,                                                           //
                server_session(co_await acc.async_accept(s, use_awaitable)), //
                asio::detached);
        }
    }
    
    asio::awaitable<void> client_session(uint16_t port) {
        try {
            tcp::socket socket(co_await this_coro::executor);
            co_await socket.async_connect({{}, port}, use_awaitable);
    
            for (std::array<char, 4024> data{0};;) {
                co_await (async_read(socket, asio::buffer(data), use_awaitable) || timeout(2ms));
                auto w = co_await async_write(socket, asio::buffer(data, 2000 /*SEHE?!*/), use_awaitable);
    
                once_in(1000, [&](size_t counter) {
                    std::cout << "#" << counter << " wrote " << w << " bytes from " << socket.local_endpoint()
                              << std::endl;
                });
            }
        } catch (boost::system::system_error const& se) {
            std::cout << "client_session Exception: " << se.code().message() << std::endl;
        } catch (std::exception const& e) {
            std::cout << "client_session Exception: " << e.what() << std::endl;
        }
    
        std::cout << "client_session closed" << std::endl;
    }
    
    int main(int argc, char** argv) {
        auto flags = std::set<std::string_view>(argv + 1, argv + argc);
        bool server = flags.contains("server");
        bool client = flags.contains("client");
    
        asio::thread_pool pool(server ? 8 : 3);
        try {
            asio::signal_set signals(pool, SIGINT, SIGTERM);
            signals.async_wait([&](auto, auto) { pool.stop(); });
    
            if (server) {
                co_spawn(pool, listener(5555), asio::detached);
            }
    
            if (client) {
                co_spawn(make_strand(pool), client_session(5555), asio::detached);
                co_spawn(make_strand(pool), client_session(5555), asio::detached);
                co_spawn(make_strand(pool), client_session(5555), asio::detached);
            }
    
            std::this_thread::sleep_for(30s); // time limited for COLIRU
        } catch (std::exception const& e) {
            std::cout << "main Exception: " << e.what() << std::endl;
        }
    }
    

    Local demo for clarity:

    enter image description here

    ² also using experimental::as_tuple for COLIRU