Search code examples
c#c++system.reactiverxcpp

Create an Observable you can unsubscribe from in RxCpp


I'm porting some code from C# that heavily relies on Rx, and I have difficulties finding C++ equivalents to some of the most used C# methods.

In particular, I want to create an observable from the subscription/unsubscription logic. In C#, I use the Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>) override to create an observable. For instance

var observable = Observable.Create<int>(observer =>
{
    observers.Add(observer);
    return () =>
    {
        observers.Remove(observer)
    };
});

Is it possible to do the same with RxCpp? I think the answer lies in the rx::observable<>::create(OnSubscribe os) method, but I can't figure out how to use it to "register" an unsubscription lambda.


Solution

  • In RxCpp and RxJava .subscribe() takes a subscriber. A subscriber is a subscription and an observer bound together.

    In RxCpp your example might look like this:

    std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers(new std::list<rxcpp::subscriber<int>>());
    
    auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
        auto it = observers->insert(observers->end(), out);
        it->add([=](){
            observers->erase(it);
        });
    });
    

    NOTE: rxcpp::subscriber<int> is a type-forgetter that hides the type of the observer. This allows it to be stored in a collection, but introduces virtual functions for on_next, on_error and on_completed.