Search code examples
c++c++11boostboost-thread

boost::shared_future and when_all with multiple continuations


I've got a DAG of tasks that I'm trying to execute using the boost::shared_future framework.

For example concreteness, consider the data flow graph shown in the figure.

enter image description here

Here's an attempt to code this up:

#include <iostream>

#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <boost/thread/future.hpp>

using namespace boost;

int main() {
   shared_future<int> fa = async([]() { sleep(1); return 123; });
   shared_future<int> fb = async([]() { sleep(2); return 456; });
   shared_future<int> fc = async([]() { sleep(5); return 789; });

  auto fabc = when_all(fa,fb,fc);
  auto fx = fabc.then([](decltype(fabc)) {
    std::cout << "A,B,C has completed, computing X\n";
    return 1;
  });
  auto fax = when_all(fa,std::move(fx));
  auto fz = fax.then([](decltype(fax)) {
    std::cout << "A,X has completed, computing Z\n";
    return 2;
  });
  auto fcx = when_all(fc,std::move(fx));  // <---- BAD
  auto fy = fcx.then([](decltype(fcx)) {
    std::cout << "C,X has completed, computing Y\n";
    return 3;
  });
  fy.get();
  fz.get();
}

However, this doesn't work (obviously, since I'm calling std::move twice on fx). I guess the question is- is there a way to get when_all and then to return "shared" types so that this executes sensibly? Or is task-DAG execution beyond the limits of what boost can do?


Solution

  • Like T.C. said, you can share your future by calling the share() member function. That way you don't need to move twice:

    Live On Coliru

    #include <iostream>
    
    #define BOOST_THREAD_PROVIDES_FUTURE
    #define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
    #define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
    #include <boost/thread/future.hpp>
    
    using namespace boost;
    using boost::this_thread::sleep_for;
    using boost::chrono::milliseconds;
    
    int main() {
        shared_future<int> fa = async([]() { sleep_for(milliseconds(100)); return 123; });
        shared_future<int> fb = async([]() { sleep_for(milliseconds(200)); return 456; });
        shared_future<int> fc = async([]() { sleep_for(milliseconds(500)); return 789; });
    
        auto fabc = when_all(fa, fb, fc);
    
        auto fx   = fabc
            .then([](decltype(fabc)) { std::cout << "A,B,C has completed, computing X\n"; return 1; })
            .share();
        auto fax  = when_all(fa, fx);
    
        auto fz   = fax
            .then([](decltype(fax)) { std::cout << "A,X has completed, computing Z\n"; return 2; })
            .share();
        auto fcx  = when_all(fc, fx);
    
        auto fy   = fcx
            .then([](decltype(fcx)) { std::cout << "C,X has completed, computing Y\n"; return 3; })
            .share();
    
        fy.get();
        fz.get();
    }
    

    Prints

    A,B,C has completed, computing X
    C,X has completed, computing Y
    A,X has completed, computing Z