Search code examples
c++multithreadingboostasiostdasync

Boost:asio and async in multi-threading


I need to call method which is a request for a remote server. After that i want to wait for an answer, and waiting is not blocked by other asynchronous function/objects(timers for example).

Method got_response(...) tells user that he got an answer from remote server, also method gets entry data which we got as an answer. Below I got my solution, but sometimes timer can be called in single thread, which will lead to method got_response() hanging.

How can I call timer to be guaranteed in other thread for answer simulation. Is there any other solution to my problem?

#include <iostream>
#include <boost/asio.hpp>
#include <future>
#include <thread>
#include <vector>
using namespace std;

namespace io = boost::asio;

struct Reply
{
    atomic<bool> ready;
    atomic<int> result;

    future<void> future_result;

    Reply()
    {
        ready = false;
        result = 0;
    }

    void call()
    {
        cout << "retry called!" << endl;
        future_result = async([&]()
                              {
                                  while (!ready)
                                  {
                                      this_thread::yield();
                                  }
                              });
    }

    int get()
    {
        future_result.wait();
        return result.load();
    }

    void got_response(int res)
    {
        result = res;
        ready = true;
    }
};

int main()
{
    Reply reply;
    reply.call();

    io::io_context context(4);

    io::steady_timer timer1(context, std::chrono::seconds(2));
    timer1.async_wait([&](const boost::system::error_code &ec)
                      { cout << "timer 1, thread name: " << this_thread::get_id() << endl; });

    io::steady_timer timer2(context, std::chrono::seconds(3));
    timer2.async_wait([&](const boost::system::error_code &ec)
                      {
                          cout << "timer 2, thread name: " << this_thread::get_id() << endl;
                          cout << reply.get() << endl;
                      });

    io::steady_timer timer3(context, std::chrono::seconds(10));
    timer3.async_wait([&](const boost::system::error_code &ec)
                      {
                          cout << "timer 3, thread name: " << this_thread::get_id() << endl;
                          reply.got_response(1337);
                      });

    vector<thread> threads;
    auto count = 2;

    for (int n = 0; n < count; ++n)
    {
        threads.emplace_back([&]
                             { context.run(); });
    }

    for (auto &th : threads)
    {
        th.join();
    }
}

Result:

retry called!
timer 1, thread name: 140712511198784
timer 2, thread name: 140712519591488
timer 3, thread name: 140712511198784
1337

Solution

  • Wow. This overcomplicating on several levels.

    • futures can have typed return values (that's actually the whole point of a future over synchronization primitives)

    • the fturue can signal the readiness with a value, no need to duplicate the readiness into a bool and then copy the result somewhere

    • this has me confused:

       int get()
       {
           _fut.wait();
           return _result.load();
       }
      

      It awaits the future, then returns the _result, you know what you invented _ready for?

    • Do you realize that std::async is not part of Boost ASIO? In fact, it doesn't work well with it because, as you correctly notice, it introduces (unspecified numbers of) threads. In general my advice is not to use std::async (it's hard to use correctly) and certainly never when using ASIO

    • When you see the same variables name var1, var2, var3 it's time to refactor your code (into functions or classes if it includes data members):

       std::deque<io::steady_timer> timers;
      
       for (int i = 1; i <= 3; ++i) {
           auto& timer = timers.emplace_back(context, std::chrono::seconds(1+i));
           timer.async_wait([i](error_code ec) {
               std::cout << "timer " << i
                         << ", thread name: " << std::this_thread::get_id()
                         << std::endl;
           });
       }
      
    • instead of a vector of threads, consider boost::thread_group or indeed boost::asio::thread_pool.

    • If you manually run the IO threads, remember to handle exceptions (Should the exception thrown by boost::asio::io_service::run() be caught?), so

       boost::thread_group threads;
       for (int n = 0; n < 2; ++n) {
           threads.create_thread([&] { context.run(); });
       }
      
       threads.join_all();
      

      Or indeed

       io::thread_pool context(2);
       context.join();
      
    • This is very inefficient

       while (!_ready) {
           std::this_thread::yield();
       }
      

      Just set the future value to signify it's ready:

    • using namespace std is generally not a good idea (Why is "using namespace std;" considered bad practice?)

    Demo

    Here's my expanded but simplified take on the question code:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <future>
    #include <iostream>
    #include <thread>
    namespace io = boost::asio;
    using namespace std::chrono_literals;
    using boost::system::error_code;
    
    // not very useful in practice, but for debug output in main
    std::ostream& debug(error_code);
    template <typename Fut> bool is_ready(Fut const& fut) {
        return fut.wait_for(0s) == std::future_status::ready;
    }
    
    int main() {
        std::promise<int>  reply;
        std::shared_future got_value = reply.get_future();
    
        io::thread_pool              context(2);
        std::deque<io::steady_timer> timers;
    
        for (int i = 1; i <= 10; ++i) {
            timers //
                .emplace_back(context, i * 1s)
                .async_wait([&got_value](error_code ec) {
                    if (is_ready(got_value))
                        debug(ec) << " Reply:" << got_value.get() << std::endl;
                    else
                        debug(ec) << " (reply not ready)" << std::endl;
                });
        }
    
        timers //
            .emplace_back(context, 4'500ms)
            .async_wait([&reply](error_code ec) {
                debug(ec) << " setting value" << std::endl;
                reply.set_value(1337);
            });
    
        context.join();
    }
        
    int friendly_thread_id() {
        return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 256;
    }
    
    #include <iomanip>
    std::ostream& debug(error_code ec) {
        auto        now           = std::chrono::system_clock::now;
        static auto program_start = now();
        return std::cout //
            << ((now() - program_start) / 1ms) << "ms\t"
            << "thread:" << std::hex << std::setfill('0') << std::showbase
            << std::setw(2) << friendly_thread_id() << std::dec << " ";
    }
    
    
    #include <iomanip>
    std::ostream& debug(error_code ec) {
        auto        now           = std::chrono::system_clock::now;
        static auto program_start = now();
        return std::cout //
            << ((now() - program_start) / 1ms) << "ms\t"
            << "thread:" << std::hex << std::setfill('0') << std::showbase
            << std::setw(2) << pretty_thread_id() << std::dec << " ";
    }
    

    Prints

    0ms     thread:0x5f  (reply not ready)
    999ms   thread:0xf3  (reply not ready)
    1999ms  thread:0x5f  (reply not ready)
    2999ms  thread:0x5f  (reply not ready)
    3499ms  thread:0xf3  setting value
    3999ms  thread:0x5f  Reply:1337
    4999ms  thread:0xf3  Reply:1337
    5999ms  thread:0xf3  Reply:1337
    6999ms  thread:0xf3  Reply:1337
    7999ms  thread:0xf3  Reply:1337
    8999ms  thread:0xf3  Reply:1337