Search code examples
c++reactive-programmingrxcpp

RxCpp: observer's lifetime if using observe_on(rxcpp::observe_on_new_thread())


What is the proper way to wait until all the observers on_completed are called if the observers are using observe_on(rxcpp::observe_on_new_thread()):

For example:

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        // ...
        s.on_completed();
    };
    auto values = rxcpp::observable<>::create<int>(generator).publish();
    auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
                    .subscribe([&](int) { slow_function(foo); }));

    auto lifetime = rxcpp::composite_subscription();
    lifetime.add([&](){ wrapper.log("unsubscribe");  });
    auto s2 = values.ref_count().as_blocking().subscribe(lifetime);

    // hope to call something here to wait for the completion of
    // s1's on_completed function
}

// the program usually crashes here when foo goes out of scope because 
// the slow_function(foo) is still working on foo.  I also noticed that
// s1's on_completed never got called.

My question is how to wait until s1's on_completed is finished without having to set and poll some variables.

The motivation of using observe_on() is because there are usually multiple observers on values, and I would like each observer to run concurrently. Perhaps there are different ways to achieve the same goal, I am open to all your suggestions.


Solution

  • Merging the two will allow a single blocking subscribe to wait for both to finish.

    {
        Foo foo;
        auto generator = [&](rxcpp::subscriber<int> s)
        {
            s.on_next(1);
            s.on_next(2);
            // ...
            s.on_completed();
        };
    
        auto values = rxcpp::observable<>::create<int>(generator).publish();
    
        auto work = values.
            observe_on(rxcpp::observe_on_new_thread()).
            tap([&](int c) {
                slow_function(foo);
            }).
            finally([](){printf("s1 completed\n");}).
            as_dynamic();
    
        auto start = values.
            ref_count().
            finally([](){printf("s2 completed\n");}).
            as_dynamic();
    
        // wait for all to finish
        rxcpp::observable<>::from(work, start).
            merge(rxcpp::observe_on_new_thread()).
            as_blocking().subscribe();
    }
    

    A few points.

    the stream must return the same type for merge to work. if combining streams of different types, use combine_latest instead.

    the order of the observables in observable<>::from() is important, the start stream has ref_count, so it must be called last so that the following merge will have subscribed to the work before starting the generator.

    The merge has two threads calling it. This requires that a thread-safe coordination be used. rxcpp is pay-for-use. by default the operators assume that all the calls are from the same thread. any operator that gets calls from multiple threads needs to be given a thread-safe coordination which the operator uses to impose thread-safe state management and output calls.

    If desired the same coordinator instance could be used for both.