I'm porting some code from C# that heavily relies on Rx, and I have difficulties finding C++ equivalents to some of the most used C# methods.
In particular, I want to create an observable from the subscription/unsubscription logic. In C#, I use the Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>)
override to create an observable. For instance
var observable = Observable.Create<int>(observer =>
{
observers.Add(observer);
return () =>
{
observers.Remove(observer)
};
});
Is it possible to do the same with RxCpp? I think the answer lies in the rx::observable<>::create(OnSubscribe os)
method, but I can't figure out how to use it to "register" an unsubscription lambda.
In RxCpp and RxJava .subscribe() takes a subscriber. A subscriber is a subscription and an observer bound together.
In RxCpp your example might look like this:
std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers(new std::list<rxcpp::subscriber<int>>());
auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
auto it = observers->insert(observers->end(), out);
it->add([=](){
observers->erase(it);
});
});
NOTE: rxcpp::subscriber<int>
is a type-forgetter that hides the type of the observer. This allows it to be stored in a collection, but introduces virtual functions for on_next, on_error and on_completed.