Search code examples
reactivexrxcpp

How to "map" a function returning a Future


Lets say you have a coroutine returning a task (using winrt to illustrate)

winrt::Windows::foundation::IAsyncOperation<int> my_coroutine(int i)
{
  co_await winrt::resume_background();
  Sleep(std::max(4 - i, 0));
  co_return i;
}

and you want to in RxCpp like this:

range(0, 4)
  | map(&my_coroutine)
  | observe_on(observe_on_event_loop())
  | subscribe<string>(println(cout))
  ; 
// Output: 3 2 1 0

So the obvious problem is that we don't want map to pass an IAsyncOperation<int> to observe_on::on_next but rather we would have the coroutine call observe_on::on_next passing an int, when it returns its value.

Perhaps we could convert IAsyncOperation<int> to rxcpp::Observable<int> and use flat_map although it seams a bit wasteful to use a range-like monad when we only have a future monad.

Is there a easy way to do this with RxCpp? If it's rather complicated, I'm happy to know that so I can move on.


P.S. I'm actually co_awaiting winrt::resume_on_signal, so maybe RxCpp provides some similar functionality with event HANDLEs.

Edit: I suppose the obvious thing to do is just poll the futures on the event loop, pushing them (i.e. calling on_next) only when they have completed.

I have decided to give up on RxCpp, and am instead pursuing writing my pipeline with templates to take advantage of type deduction from lambda arguments (static polymorphism for refactoring rather than reuse).


Solution

  • I think the correct thing to do is just to not have the future at all

    int my_function(int i)
    {
      Sleep(std::max(4 - i, 0));
      return i;
    }
    

    and run this function on a thread pool

    range(0, 4)
      | observe_on(observe_on_thread_pool()) // doesn't actually exist
      | map(&my_function)
      | observe_on(observe_on_event_loop())
      | subscribe<string>(println(cout))
      ; 
    // Output: 3 2 1 0
    

    making sure that the observer it calls next on is thread safe.

    And in rxcppv3 it shouldn't be too hard to write a custom operator in place of observe_on(observe_on_thread_pool()) | map:

    const auto map_on_thread_pool = [](auto func){
      return make_lifter([=](auto r){
        return make_observer(r, [=](auto& r, auto v){
            auto f = [=](){ r.next(func(v)); };
            //On the windows thread pool
            TrySubmitThreadpoolCallback(..); // what ever hacks are needed to run f
        });
      });
    };