Search code examples
c++boostboost-asiocoroutinec++-coroutine

C++ Client for external server & Server for external clients simultaneously with boost::asio coroutine


Is it possible to create a single-thread process with a client AND a server running in parallel and communicating between them?

I have an scenario where I need to be both client and server and we can receive/send data in any possible direction.

  • Server (PORT A) -> APP(client-> process message and send to ->server (PORT B)) -> ClientA
  • Server (PORT A) <- APP(client<- process message and send to <-server (PORT B)) <- ClientA
  • Server (PORT A) -> APP(client-> process message and send to to Server (PORT A)
  • ClientA -> APP(server(PORT B)-> process message and send to to Client A

I've been trying to modify the examples using boost::asio coroutines provided in the examples and add a client in the io_context or on a separate io_context, but I am struggling with it.

I tried having the client and the server in separate threads, but I am having problems with concurrency as well... Any idea or snippet will be much appreciated.


Solution

  • I have an scenario where I need to be both client and server and we can receive/send data in any possible direction.

    This is pretty standard for a networked application.

    The chat server seems a good example to start with (since it does send/receive in all directions): https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/examples/cpp11_examples.html#boost_asio.examples.cpp11_examples.chat

    Now, as barebones example of a server that also initiates outgoing connections:

    Live On Wandbox

    Let's create a server that spawns an async session for each accepted connection. Each session is a "reverse echo" service.

    // echo server, multi-client
    spawn(io, [&io](ba::yield_context yc) {
        tcp::acceptor acc(io, {{}, 6868});
        acc.set_option(tcp::acceptor::reuse_address(true));
    
        while (true) {
            tcp::socket s(io);
            acc.async_accept(s, yc);
            spawn(yc, [s = std::move(s)]
                (ba::yield_context yc) mutable {
                log("Connection from ", s.remote_endpoint());
                std::string msg;
                while (auto n = async_read_until(s, ba::dynamic_buffer(msg), "\n", yc)) {
                    std::string_view vw(msg.data(), n);
                    vw.remove_suffix(1); // leave '\n'
                    log("Responding to ", std::quoted(vw));
    
                    std::reverse(msg.data(), msg.data() + vw.size());
    
                    async_write(s, ba::buffer(msg, n), yc);
                    msg = msg.substr(n);
                }
            });
        }
    });
    

    Simultaneously, let's run 5 clients. For want of an external server, we'll just make them connect to our own server.

    That allows us to have a self-contained demo, and will prove that the single-threading is not leading to any blocking.

    // a random client, let's make it connect to our own server, just for this demo
    for (auto client_id = 0; client_id<5; ++client_id) {
        spawn(io,
            [&io, delay, client_id, log=logger("client #" + std::to_string(client_id))]
            (ba::yield_context yc) {
                tcp::resolver r(io);
                tcp::socket s(io);
                async_connect(s, r.async_resolve("127.0.0.1", "6868", yc), yc);
    
                while (true) {
                    delay(yc);
                    ba::streambuf buf;
                    std::ostream(&buf)
                        << "Hello from client #" << client_id << "\n";
    
                    async_write(s, buf, yc);
    
                    std::string response;
                    async_read_until(s, ba::dynamic_buffer(response), "\n", yc);
                    if (!response.empty())
                        response.pop_back();
    
                    log("Received response ", std::quoted(response));
                }
            });
    }
    

    Where delay is a random delay (500..1500ms):

    auto delay = [&io](ba::yield_context yc) {
        ba::steady_timer(io, 500ms + (prng() % 1000) * 1ms)
            .async_wait(yc);
    };
    

    We run the whole program for 3 seconds, and quit:

    io.run_for(3s);
    logger("main")("Bye");
    

    Prints

    at     0ms  session #0  Connection from 127.0.0.1:51024
    at     1ms  session #1  Connection from 127.0.0.1:51026
    at     1ms  session #2  Connection from 127.0.0.1:51028
    at     1ms  session #3  Connection from 127.0.0.1:51030
    at     1ms  session #4  Connection from 127.0.0.1:51032
    at   831ms  session #3  Responding to "Hello from client #3"
    at   831ms  client #3   Received response "3# tneilc morf olleH"
    at  1148ms  session #4  Responding to "Hello from client #4"
    at  1148ms  client #4   Received response "4# tneilc morf olleH"
    at  1196ms  session #1  Responding to "Hello from client #1"
    at  1196ms  client #1   Received response "1# tneilc morf olleH"
    at  1327ms  session #0  Responding to "Hello from client #0"
    at  1327ms  client #0   Received response "0# tneilc morf olleH"
    at  1401ms  session #2  Responding to "Hello from client #2"
    at  1401ms  client #2   Received response "2# tneilc morf olleH"
    at  1446ms  session #3  Responding to "Hello from client #3"
    at  1446ms  client #3   Received response "3# tneilc morf olleH"
    at  1836ms  session #4  Responding to "Hello from client #4"
    at  1836ms  client #4   Received response "4# tneilc morf olleH"
    at  2163ms  session #0  Responding to "Hello from client #0"
    at  2163ms  client #0   Received response "0# tneilc morf olleH"
    at  2382ms  session #2  Responding to "Hello from client #2"
    at  2383ms  client #2   Received response "2# tneilc morf olleH"
    at  2426ms  session #3  Responding to "Hello from client #3"
    at  2426ms  client #3   Received response "3# tneilc morf olleH"
    at  2444ms  session #4  Responding to "Hello from client #4"
    at  2444ms  client #4   Received response "4# tneilc morf olleH"
    at  2579ms  session #1  Responding to "Hello from client #1"
    at  2580ms  client #1   Received response "1# tneilc morf olleH"
    at  3002ms  main    Bye
    

    Full Listing

    Live On Wandbox

    #include <boost/asio/detail/handler_alloc_helpers.hpp>
    #include <boost/system/system_error.hpp>
    #define BOOST_BIND_NO_PLACEHOLDERS
    #include <boost/asio.hpp>
    #include <boost/asio/spawn.hpp>
    #include <iostream>
    #include <iomanip>
    #include <thread>
    #include <chrono>
    #include <random>
    
    namespace ba = boost::asio;
    using ba::ip::tcp;
    
    using namespace std::literals;
    
    static auto const now = &std::chrono::steady_clock::now;
    static auto const start = now();
    
    auto logger(std::string name) {
        return [name](auto const&... args) {
            ((std::cout << "at" << std::setw(6) << (now() - start)/1ms << "ms\t"
                        << name << "\t") 
                << ... << args) << std::endl;
        };
    }
    
    int main() {
        ba::io_context io;
    
        static std::mt19937 prng { std::random_device{}() };
    
        // insert random async delays
        auto delay = [&io](auto yc) {
            ba::steady_timer(io, 500ms + (prng() % 1000) * 1ms)
                .async_wait(yc);
        };
    
        // echo server, multi-client
        spawn(io, [&io, log=logger("accept")](ba::yield_context yc) {
            tcp::acceptor acc(io, {{}, 6868});
            acc.set_option(tcp::acceptor::reuse_address(true));
    
            auto num_clients = 0;
            while (true) {
                tcp::socket s(io);
                acc.async_accept(s, yc);
                spawn(yc, [s = std::move(s), log=logger("session #" + std::to_string(num_clients++))]
                    (ba::yield_context yc) mutable {
                    log("Connection from ", s.remote_endpoint());
                    std::string msg;
                    while (auto n = async_read_until(s, ba::dynamic_buffer(msg), "\n", yc)) {
                        std::string_view vw(msg.data(), n);
                        vw.remove_suffix(1); // leave '\n'
                        log("Responding to ", std::quoted(vw));
    
                        std::reverse(msg.data(), msg.data() + vw.size());
    
                        async_write(s, ba::buffer(msg, n), yc);
                        msg = msg.substr(n);
                    }
                });
            }
        });
    
        // a random client, let's make it connect to our own server, just for this demo
        for (auto client_id = 0; client_id<5; ++client_id) {
            spawn(io,
                [&io, delay, client_id, log=logger("client #" + std::to_string(client_id))]
                (ba::yield_context yc) {
                    tcp::resolver r(io);
                    tcp::socket s(io);
                    async_connect(s, r.async_resolve("127.0.0.1", "6868", yc), yc);
    
                    while (true) {
                        delay(yc);
                        ba::streambuf buf;
                        std::ostream(&buf)
                            << "Hello from client #" << client_id << "\n";
    
                        async_write(s, buf, yc);
    
                        std::string response;
                        async_read_until(s, ba::dynamic_buffer(response), "\n", yc);
                        if (!response.empty())
                            response.pop_back();
    
                        log("Received response ", std::quoted(response));
                    }
                });
        }
    
        io.run_for(3s);
        logger("main")("Bye");
    }