Lets say you have a coroutine returning a task (using winrt to illustrate)
winrt::Windows::foundation::IAsyncOperation<int> my_coroutine(int i)
{
co_await winrt::resume_background();
Sleep(std::max(4 - i, 0));
co_return i;
}
and you want to in RxCpp like this:
range(0, 4)
| map(&my_coroutine)
| observe_on(observe_on_event_loop())
| subscribe<string>(println(cout))
;
// Output: 3 2 1 0
So the obvious problem is that we don't want map
to pass an IAsyncOperation<int>
to observe_on::on_next
but rather we would have the coroutine call observe_on::on_next
passing an int
, when it returns its value.
Perhaps we could convert IAsyncOperation<int>
to rxcpp::Observable<int>
and use flat_map
although it seams a bit wasteful to use a range-like monad when we only have a future monad.
Is there a easy way to do this with RxCpp? If it's rather complicated, I'm happy to know that so I can move on.
P.S. I'm actually co_awaiting winrt::resume_on_signal
, so maybe RxCpp provides some similar functionality with event HANDLE
s.
Edit: I suppose the obvious thing to do is just poll the futures on the event loop, pushing them (i.e. calling on_next
) only when they have completed.
I have decided to give up on RxCpp, and am instead pursuing writing my pipeline with templates to take advantage of type deduction from lambda arguments (static polymorphism for refactoring rather than reuse).
I think the correct thing to do is just to not have the future at all
int my_function(int i)
{
Sleep(std::max(4 - i, 0));
return i;
}
and run this function on a thread pool
range(0, 4)
| observe_on(observe_on_thread_pool()) // doesn't actually exist
| map(&my_function)
| observe_on(observe_on_event_loop())
| subscribe<string>(println(cout))
;
// Output: 3 2 1 0
making sure that the observer it calls next on is thread safe.
And in rxcppv3 it shouldn't be too hard to write a custom operator in place of observe_on(observe_on_thread_pool()) | map
:
const auto map_on_thread_pool = [](auto func){
return make_lifter([=](auto r){
return make_observer(r, [=](auto& r, auto v){
auto f = [=](){ r.next(func(v)); };
//On the windows thread pool
TrySubmitThreadpoolCallback(..); // what ever hacks are needed to run f
});
});
};