I am trying to implement an observer/observable
pattern in Rx-cpp
. These is a very interesting tutorial in Rx.Net
on how someone can to this.
In this C#
example, there are specific interfaces
that we have to override:
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
As far as I understand, in Rx-cpp
there is not such a convenience. So, is it possible to provide me with some header example (myObservable.h
/myObserver.h
), similar to the interfaces
above, that I can use as a guidance to define the same communication pattern?
Any help is highly appreciated, Thank you!
EDIT 1:
Thanks to @zentrunix
, I am trying to make a class oriented communication. So far I have the code bellow for the observable pattern. What I want is to define a list of observers that will me attached into the observable and when an OnNext
is called these observers should be notified. However, there are missing part.
subscribe()
on those observers (Rx::subscribers<int>
) when a myObservable::Subscribe()
functions is called. unsubscribe()
. o.subscribe(onNext, onEnd);
in multiple onNext
observers will that be? Would it be possible to construct a corresponding myObserver
class? (again inspired by here) Sorry for asking but is it meaningful such an organization? So far I was working with the architecture provided in this tutorial and this is the reason I am obsessed with this task. I found it as a way to get involved with the RxCpp
. Any comments are highly appreciated.(Again sorry for my ignorance.)
class myObservable {
private:
std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers;
public:
myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); };
Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) {
return Rx::observable<>::create<int>([&, out]() {
auto it = observers->insert(observers->end(), *out);
it->add([=]() {
observers->erase(it);
});
});
};
void OnNext(int sendItem) {
for (Rx::subscriber<int> observer : *observers) {
(observer).on_next(sendItem);
}
}
void Disposer(Rx::subscriber<int> out) {
observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end());
};
};
A very simple example in RxCpp below. There's (at least) one caveat though: typical RxCpp code makes heavy use of lambdas, which I dislike very much.
I've also trying to find documentation and tutorials on the internet, but couldn't find any. I'm especially interested in explanations about the threading models.
If you're willing to wade through code and Doxygen documentation, there are lots of examples in the RxCpp GitHub site.
#include <iostream>
#include <exception>
#include "rxcpp/rx.hpp"
namespace rx = rxcpp;
static void onNext(int n) { std::cout << "* " << n << "\n"; }
static void onEnd() { std::cout << "* end\n"; }
static void onError(std::exception_ptr ep)
{
try { std::rethrow_exception(ep); }
catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; }
}
static void observableImpl(rx::subscriber<int> s)
{
s.on_next(1);
s.on_next(2);
s.on_completed();
}
int main()
{
auto o = rxcpp::observable<>::create<int>(observableImpl);
std::cout << "*\n";
o.subscribe(onNext, onEnd);
}