I've got a DAG of tasks that I'm trying to execute using the boost::shared_future
framework.
For example concreteness, consider the data flow graph shown in the figure.
Here's an attempt to code this up:
#include <iostream>
#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <boost/thread/future.hpp>
using namespace boost;
int main() {
shared_future<int> fa = async([]() { sleep(1); return 123; });
shared_future<int> fb = async([]() { sleep(2); return 456; });
shared_future<int> fc = async([]() { sleep(5); return 789; });
auto fabc = when_all(fa,fb,fc);
auto fx = fabc.then([](decltype(fabc)) {
std::cout << "A,B,C has completed, computing X\n";
return 1;
});
auto fax = when_all(fa,std::move(fx));
auto fz = fax.then([](decltype(fax)) {
std::cout << "A,X has completed, computing Z\n";
return 2;
});
auto fcx = when_all(fc,std::move(fx)); // <---- BAD
auto fy = fcx.then([](decltype(fcx)) {
std::cout << "C,X has completed, computing Y\n";
return 3;
});
fy.get();
fz.get();
}
However, this doesn't work (obviously, since I'm calling std::move
twice on fx
). I guess the question is- is there a way to get when_all
and then
to return "shared" types so that this executes sensibly? Or is task-DAG execution beyond the limits of what boost can do?
Like T.C. said, you can share your future by calling the share()
member function. That way you don't need to move twice:
#include <iostream>
#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <boost/thread/future.hpp>
using namespace boost;
using boost::this_thread::sleep_for;
using boost::chrono::milliseconds;
int main() {
shared_future<int> fa = async([]() { sleep_for(milliseconds(100)); return 123; });
shared_future<int> fb = async([]() { sleep_for(milliseconds(200)); return 456; });
shared_future<int> fc = async([]() { sleep_for(milliseconds(500)); return 789; });
auto fabc = when_all(fa, fb, fc);
auto fx = fabc
.then([](decltype(fabc)) { std::cout << "A,B,C has completed, computing X\n"; return 1; })
.share();
auto fax = when_all(fa, fx);
auto fz = fax
.then([](decltype(fax)) { std::cout << "A,X has completed, computing Z\n"; return 2; })
.share();
auto fcx = when_all(fc, fx);
auto fy = fcx
.then([](decltype(fcx)) { std::cout << "C,X has completed, computing Y\n"; return 3; })
.share();
fy.get();
fz.get();
}
Prints
A,B,C has completed, computing X
C,X has completed, computing Y
A,X has completed, computing Z