Search code examples
c#c++system.reactiveobserver-patternrxcpp

Construct Observer/Observable pattern using RxCpp


I am trying to implement an observer/observable pattern in Rx-cpp. These is a very interesting tutorial in Rx.Net on how someone can to this.

In this C# example, there are specific interfaces that we have to override:

public interface IObserver<in T>
{
    void OnCompleted();
    void OnError(Exception error);
    void OnNext(T value);
}


public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

As far as I understand, in Rx-cpp there is not such a convenience. So, is it possible to provide me with some header example (myObservable.h/myObserver.h), similar to the interfaces above, that I can use as a guidance to define the same communication pattern?

Any help is highly appreciated, Thank you!

EDIT 1: Thanks to @zentrunix, I am trying to make a class oriented communication. So far I have the code bellow for the observable pattern. What I want is to define a list of observers that will me attached into the observable and when an OnNext is called these observers should be notified. However, there are missing part.

  1. How can I subscribe() on those observers (Rx::subscribers<int>) when a myObservable::Subscribe() functions is called.
  2. Also how can I unsubscribe().
  3. Finally, how a corresponding o.subscribe(onNext, onEnd); in multiple onNext observers will that be? Would it be possible to construct a corresponding myObserver class? (again inspired by here)
  4. Sorry for asking but is it meaningful such an organization? So far I was working with the architecture provided in this tutorial and this is the reason I am obsessed with this task. I found it as a way to get involved with the RxCpp. Any comments are highly appreciated.(Again sorry for my ignorance.)

    class myObservable {
    
    private:
    
    std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers;
    
    public:
    
    myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); };
    
    Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) {
    
        return Rx::observable<>::create<int>([&, out]() {
            auto it = observers->insert(observers->end(), *out);
            it->add([=]() {
                observers->erase(it);
            });
        });
    
    };
    
    void OnNext(int sendItem) {
    
        for (Rx::subscriber<int> observer : *observers) {
            (observer).on_next(sendItem);
        }
    }
    
    void Disposer(Rx::subscriber<int> out) {
    
        observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end());
    };
    };
    

Solution

  • A very simple example in RxCpp below. There's (at least) one caveat though: typical RxCpp code makes heavy use of lambdas, which I dislike very much.

    I've also trying to find documentation and tutorials on the internet, but couldn't find any. I'm especially interested in explanations about the threading models.

    If you're willing to wade through code and Doxygen documentation, there are lots of examples in the RxCpp GitHub site.

    #include <iostream>
    #include <exception>
    
    #include "rxcpp/rx.hpp"
    namespace rx = rxcpp;
    
    static void onNext(int n) { std::cout << "* " << n << "\n"; }
    static void onEnd() { std::cout << "* end\n"; }
    
    static void onError(std::exception_ptr ep)
    {
      try { std::rethrow_exception(ep); }
      catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; }
    }
    
    static void observableImpl(rx::subscriber<int> s)
    {
      s.on_next(1);
      s.on_next(2);
      s.on_completed();
    }
    
    int main()
    {
      auto o = rxcpp::observable<>::create<int>(observableImpl);
      std::cout << "*\n";
      o.subscribe(onNext, onEnd);
    }