Search code examples
c++synchronizationboost-asioc++20asio

Using asio::io_context::strand to serialize posted tasks


I'm trying to synchronized the completion handlers using strand but I'm not getting the expected output. When I'm using asio::post without wrapping the completion handler in strand I get the correct output but it is not synchronized. When I wrap the completion handler in strand, I don't receive any output.

Here is the minimal reproducible example:

#include <asio.hpp>
#include <thread>
#include <iostream>
#include <vector>
#include <random>

struct Task
{
    Task(int id, int wait_time) 
        : id_{id}
        , wait_time_{wait_time}
    {}

    void operator()()
    {
        std::cout << "Tast-" << id_ << " started. [" << std::this_thread::get_id() << "]" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
        std::cout << "Task-" << id_ << " finished after (" << wait_time_
                  << ") milliseconds. [" << std::this_thread::get_id() << "]" << std::endl;
    }
    int id_;
    int wait_time_;
};

int main()
{
    std::random_device rd;
    std::mt19937 engine(rd());
    std::uniform_int_distribution d(500, 2'000);

    asio::io_context ctx;
    asio::io_context::strand strand(ctx);

    std::vector<std::jthread> threads;
    auto count = 4;
    for (int i = 0; i < count; ++i)
    {
        threads.emplace_back([&]{ ctx.run(); });
    }

    for (int i = 0; i < count * 2; ++i)
    {
        asio::post(Task(i + 1, d(engine)));
        // asio::post(strand.wrap(Task(i + 1, d(engine))));  /* THIS DOESN'T PRODUCE ANY OUTPUT */
    }
    ctx.run();
}

Output when using asio::post(Task(i + 1, d(engine)));

Tast-1 started. [17652]
Tast-7 started. [26096]
Tast-3 started. [56484]
Tast-8 started. [32000]
Tast-5 started. [Tast-6 started. [79448]
61340]Tast-2Tast-4 started. [55696]
 started. [84880]

Task-6 finished after (784) milliseconds. [79448]
Task-2 finished after (835) milliseconds. [84880]
Task-1 finished after (923) milliseconds. [17652]
Task-4 finished after (1281) milliseconds. [55696]
Task-3 finished after (1668) milliseconds. [56484]
Task-7 finished after (1763) milliseconds. [26096]
Task-8 finished after (1888) milliseconds. [32000]
Task-5 finished after (1982) milliseconds. [61340]

How to use strand to synchronize these completion handlers? I'm using asio standalone on Windows 10 and MSVC compiler.


Solution

  • The other answer identified your race condition.

    Instead of manually managing the threads, only to then require a work-guard as well, I'd use the asio::thread_pool facility instead.

    Also, don't use the deprecated strand nested typedef. Instead use a strand<> executor adaptor:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <random>
    namespace asio = boost::asio;
    using namespace std::chrono_literals;
    
    using Duration = std::chrono::steady_clock::duration;
    
    struct Task {
        int      id_;
        Duration delay_;
    
        void operator()() const {
            static int tid_gen = 0;
            thread_local std::string const tid = "[" + std::to_string(++tid_gen) + "] ";
    
            std::cout << tid << "Task-" << id_ << " started" << std::endl;
            std::this_thread::sleep_for(delay_);
            std::cout << tid << "Task-" << id_ << " finished after " << delay_ / 1.ms << "ms" << std::endl;
        }
    };
    
    int main() {
        auto d = bind(std::uniform_int_distribution(500, 2'000), std::mt19937(std::random_device{}()));
    
        asio::thread_pool ctx(4);
        auto strand = make_strand(ctx);
    
        for (int i = 1; i <= 8; ++i)
            post(bind_executor(strand, Task{i, 1ms * d()}));
    
        ctx.join();
    }
    

    Prints e.g.

    [1] Task-1 started
    [1] Task-1 finished after 839ms
    [1] Task-2 started
    [1] Task-2 finished after 1674ms
    [1] Task-3 started
    [1] Task-3 finished after 1355ms
    [1] Task-4 started
    [1] Task-4 finished after 1368ms
    [1] Task-5 started
    [1] Task-5 finished after 1141ms
    [1] Task-6 started
    [1] Task-6 finished after 771ms
    [1] Task-7 started
    [1] Task-7 finished after 1445ms
    [1] Task-8 started
    [1] Task-8 finished after 881ms
    

    Mixing in some non-strand tasks (#9-12):

    for (int i = 1; i <= 8; ++i)
        post(bind_executor(strand, Task{i, 1ms * d()}));
    
    for (int i = 9; i <= 12; ++i)
        post(ctx, Task{i, 1ms * d()});
    

    Prints e.g.

    [1] Task-9 started
    [2] Task-1 started
    [3] Task-10 started
    [4] Task-11 started
    [3] Task-10 finished after 984ms
    [3] Task-12 started
    [1] Task-9 finished after 1007ms
    [2] Task-1 finished after 1417ms
    [2] Task-2 started
    [4] Task-11 finished after 1958ms
    [2] Task-2 finished after 597ms
    [2] Task-3 started
    [3] Task-12 finished after 1659ms
    [2] Task-3 finished after 1532ms
    [2] Task-4 started
    [2] Task-4 finished after 639ms
    [2] Task-5 started
    [2] Task-5 finished after 1576ms
    [2] Task-6 started
    [2] Task-6 finished after 1617ms
    [2] Task-7 started
    [2] Task-7 finished after 889ms
    [2] Task-8 started
    [2] Task-8 finished after 1451ms