c++multithreadingpthreadsscheduling

Taking a snapshot of state asynchronosly


I have the following scenario:

  • Thread 1 does some computation
  • Thread 2 may want to take a snapshot of the latest result in thread 1
  • Thread 2 does not have any insight in what is happening in thread 1. Thus, a ping-pong buffer may not be possible here.

I tried the following implementation:

  • Thread 1 will call the computation function repeatedly, while holding a mutex:
void do_run()
{
    while(!m_should_stop.load())
    {
        std::lock_guard lock{m_task_mtx};
        if(m_task.step() == task_step_result::task_is_completed)
        { return; } 
    }
}
  • Thread 2 will try to acquire the mutex so it can safely fetch the state
state get_state() const
{
    std::lock_guard lock{m_task_mtx};
    return m_task.current_state();
}

The problem is that get_state never gets a chance to acquire the mutex, unless there is an artificial sleep before the lock is captured by do_run. Could this problem be solved in some other way, such as using some priority schema. If get_state is waiting on the resource, it should have priority over step.


Solution

  • I solved the issue by injecting the request into thread 1. Basically it becomes like this

    Thread 1:

    1. Check the mailbox, if not empty

      1. Execute the task
      2. Clear the mailbox
      3. Notify Thread 2 that the result is ready (via a condition_variable)
    2. Continue with the real work

    Thread 2:

    1. Store data about what Thread 1 should execute (a function pointer, a pointer to the closure, and a pointer to the storage for the return value). The return value is stored inside std::optional
    2. Use the condition_variable and wait for the data to become available

    The commuincation channel:

    class sync_message_bus
    {
    public:
        void operator()()  // Called from Thread 1
        {
            std::lock_guard lock{m_mtx};
            if(m_closure != nullptr) [[unlikely]]
            {
                m_callback(m_closure, m_result);
                m_closure = nullptr;
                m_cv.notify_one();
            }
        }
        template<class Callable>
        auto process(Callable&& f, bool invoke_now)  // Called from Thread 2
        {
            if(invoke_now)  // In case Thread 1 is not running
            { return std::forward<Callable>(f)(); }
            using type = std::invoke_result_t<Callable>;
            std::optional<type> retval;
            {
                std::unique_lock lock{m_mtx};
                m_closure = &f;
                m_callback = [](void* closure, void* result) {
                    auto& f = *static_cast<Callable*>(closure);
                    auto& retval = *static_cast<std::optional<type>*>(result);
                    retval = f();
                };
                m_result = &retval;
                m_cv.wait(lock, [&retval](){
                    return retval.has_value();
                });
            }
            return std::move(*retval);
        }
    
    private:
        std::mutex m_mtx;
        void* m_closure{nullptr};
        void (*m_callback)(void* value, void* result){nullptr};
        void* m_result{nullptr};
        std::condition_variable m_cv;
    };