Search code examples
c++asiostdasync

Tasks on asio::strand are running on a single thread


I modified an asio strand example using the standalone version of the library from 4a here

#include <iostream>
#include <asio.hpp>
#include <future>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std::chrono_literals;

namespace util
{
static std::mutex s_mtx_print;

// Default argument value
// https://en.cppreference.com/w/cpp/language/default_arguments
template <typename... Args>
void sync_print(const bool log_thread_id, Args &&... args)
{
    std::lock_guard<std::mutex> print_lock(s_mtx_print);
    if (log_thread_id)
    {
        std::cout << "[" << std::this_thread::get_id() << "] ";
    }
    (std::cout << ... << args) << '\n';
}

}

void Worker(std::unique_ptr<asio::io_service> &ios)
{
    util::sync_print(true, " Started...");
    if(ios) {ios->run();}
    util::sync_print(true, " End");
}

void PrintNum(int n)
{
    std::cout << "[" << std::this_thread::get_id() << "] " << n << '\n';
    std::this_thread::sleep_for(300ms);
}

void OrderedInvocation(std::unique_ptr<asio::io_service::strand> &up_strand)
{
    if(up_strand)
    {
        up_strand->post(std::bind(&PrintNum, 1));
        up_strand->post(std::bind(&PrintNum, 2));
        up_strand->post(std::bind(&PrintNum, 3));
        up_strand->post(std::bind(&PrintNum, 4));
        up_strand->post(std::bind(&PrintNum, 5));
        up_strand->post(std::bind(&PrintNum, 6));
        up_strand->post(std::bind(&PrintNum, 7));
        up_strand->post(std::bind(&PrintNum, 8));
        up_strand->post(std::bind(&PrintNum, 9));
    }
    else{
        std::cerr << "Invalid strand" << '\n';
    }
}

int main()
{
    util::sync_print(true, "section 4 started ...");
    auto up_ios = std::make_unique<asio::io_service>();
    auto up_work = std::make_unique<asio::io_service::work>(*up_ios);
    auto up_strand = std::make_unique<asio::io_service::strand>(*up_ios);

    std::vector<std::future<void>> tasks;
    constexpr int NUM_TASK = 3;

    for(int i = 0; i< NUM_TASK; ++i)
    {
        tasks.push_back(std::async(std::launch::async, &Worker, std::ref(up_ios)));
    }
    std::cout << "Task size " << tasks.size() << '\n';
    std::this_thread::sleep_for(500ms);
    OrderedInvocation(up_strand);

    up_work.reset();

    for(auto &t: tasks){ t.get(); }
    return 0;
}

The problem is: when I run the code, it appears that the function PrintNum only runs on a single thread

as the console output is

[140180645058368] section 4 started ...
Task size 3
[140180610144000]  Started...
[140180626929408]  Started...
[140180618536704]  Started...
[140180610144000] 1
[140180610144000] 2
[140180610144000] 3
[140180610144000] 4
[140180610144000] 5
[140180610144000] 6
[140180610144000] 7
[140180610144000] 8
[140180610144000] 9
[140180610144000]  End
[140180626929408]  End
[140180618536704]  End

My question is, do I need to configure the strand to let the tasks spread to all threads? Or maybe I missed something here?

[Edit] Ideally, the output should be something like

[00154F88] The program will exit when all work has finished.
[001532B0] Thread Start
[00154FB0] Thread Start
[001532B0] x: 1
[00154FB0] x: 2
[001532B0] x: 3
[00154FB0] x: 4
[001532B0] x: 5
[00154FB0] Thread Finish
[001532B0] Thread Finish
Press any key to continue . . .

In the expected output, both thread 00154FB0 and 001532B0 executed the PrintNum(), but in the modified version, only one thread executed the PrintNum().

If the strand is not been used, the output is:

[140565152012096] section 4 started ...
[140565133883136]  Started...
Task size 3
[140565117097728]  Started...
[140565125490432]  Started...
[[140565133883136] [140565117097728]] 12

3
[140565133883136] [4
[140565117097728140565125490432] 6
] 5
[140565133883136] 7
[140565125490432] 8
[140565117097728] 9
[140565125490432]  End
[140565117097728]  End
[140565133883136]  End

Thanks

Here is the cpu info from the machine I am using

$lscpu
Thread(s) per core:  1
Core(s) per socket:  4
Socket(s):           1

The OS is Ubuntu 18.04

Rong


Solution

  • That's the purpose of a strand:

    A strand is defined as a strictly sequential invocation of event handlers (i.e. no concurrent invocation). Use of strands allows execution of code in a multithreaded program without the need for explicit locking (e.g. using mutexes).

    If you want parallel invocation, you will need to remove the strand, post() directly to io_service and invoke io_service::run from a number of threads (you're doing that already).

    An unrelated note: there is no point in passing unique pointers around; make your life easier and just pass raw pointers or references.