Search code examples
c++multithreadingpromisefuture

Feed subtasks with values, how to do?


I want a main thread which can give values to a number of subtasks (where the subtasks takes long time to execute) and where I do not want to end the subtasks and restart them with new values. I try to use promise and future, but going this route is going to be complicated, since the promise and futures is not "reusable" or in fact I can not do two future.get() in a row...

Here is a code example, but this is not executing since the two future.get() in the print_int-task will run directly after each other!

#include <iostream>       // std::cout
#include <functional>     // std::ref
#include <thread>         // std::thread
#include <future>         // std::promise, std::future

void print_int(std::future<int>& input_future, std::promise<void>& done_promise)
{
    std::cout << "Print thread starts!" << std::endl;

    int x = input_future.get(); 
    while (x > 0)
    {
        std::cout << "Recived value: " << x;
        done_promise.set_value();
        std::cout << " done is set" << std::endl;

        int x = input_future.get();
    }
}


int main()
{
    std::cout << "Main thread starts!" << std::endl;

    std::promise<int> input_prom;                     
    std::promise<void> done_promise;
    std::future<void> done_future = done_promise.get_future();
    std::future<int> input_fut = input_prom.get_future();    
    std::thread thred(print_int, std::ref(input_fut), std::ref(done_promise)); 

    int counter = 10;

    while (counter > 1)
    {

        input_prom.set_value(counter);                         // fulfill promise

        //the print_int thread excecutes and prints

        done_future.get(); // wait until its printed
        counter--;
    }

    // (synchronizes with getting the future)
    thred.join();
    return 0;

}


Solution

  • this is a typical producer-consumer problem, you should send a new future for every task you need to execute, the "sending" should be done with a thread-safe queue, and the "returning" should be done using the promise as follows.

    #include <iostream>       // std::cout
    #include <functional>     // std::ref
    #include <thread>         // std::thread
    #include <future>         // std::promise, std::future
    #include <queue>
    #include <mutex>
    #include <optional>
    
    class SPSCQueue
    {
    public:
        using OptionalPromisePair = std::optional<std::pair<int, std::promise<std::string>>>;
        void put(OptionalPromisePair&& p)
        {
            {
                std::lock_guard<std::mutex> lk(m_mut);        
                m_queue.push(std::move(p));
            }
            m_cv.notify_one();
        }
        OptionalPromisePair get()
        {
            if (this->m_queue.size() == 0)
            {
                std::unique_lock lk(m_mut);
                m_cv.wait(lk, [this]{ return this->m_queue.size() != 0; });
            }
            std::lock_guard lk2(m_mut);
            OptionalPromisePair ret = std::move(m_queue.front());
            m_queue.pop();
            return ret;
        }
    private:
        std::queue<OptionalPromisePair> m_queue;
        std::mutex m_mut;
        std::condition_variable m_cv;
    };
    
    void print_int(SPSCQueue& queue)
    {
        std::cout << "Print thread starts!" << std::endl;
    
        while (true)
        {
            SPSCQueue::OptionalPromisePair p = queue.get();
            if (!p)
            {
                return;
            }
            std::cout << "Recived value: " << (*p).first;
            (*p).second.set_value("val");
            std::cout << " future is set" << std::endl;
        }
    }
    
    
    int main()
    {
        std::cout << "Main thread starts!" << std::endl;
    
        SPSCQueue queue;
        std::thread thred(print_int, std::ref(queue)); 
    
        int counter = 10;
    
        while (counter > 1)
        {
            std::promise<std::string> promise;
            std::future<std::string> done_future = promise.get_future();
    
            queue.put({{counter, std::move(promise)}});
            //the print_int thread excecutes and prints
    
            std::string returned_val = std::move(done_future.get()); // wait until its printed
            std::cout << "received " << returned_val << " in main thread\n";
            counter--;
        }
        queue.put({});  // signal thread termination
        thred.join();
        return 0;
    }
    

    you can find other implementations of a thread-safe multiple consumers queue, or a non-blocking SPSCQueue, this bare-bones implementation is just for illustration.

    also if you don't intend on returning a string you should use a smaller type than std::string, maybe an int enum, and you could wrap the thread and the queue in an RAII object that will signal termination and join the thread in the destructor to have exception safety.