Search code examples
rxcpp

RxCpp - Observable from a subject does not work in a separate thread with observe_on_event_loop


newbie to RxCpp. I found that a subscriber created by make_subscriber and subscribe on a observable::create can observe/subscribe on the new thread, but not with the case by them getting from a subject.

I want to let the simple subject work in the primary thread and the behavior subject in the background thread. The demo codes are as follows:

#include <rxcpp/rx.hpp>

#include <unistd.h>

int main()
{
  std::cout << "Main thread id : "
                << std::this_thread::get_id() << "\n";

  auto eventLoopCoordination =
      rxcpp::observe_on_event_loop(); // place emission to the queue and process
                                      // them in the thread pool under the hood
  auto immediateCoordination =
      rxcpp::identity_immediate(); // emit the item immediately with no
                                   // scheduler at all

  rxcpp::subjects::subject<int> subject;
  rxcpp::subjects::behavior<int> behsubject(0);

  auto simpleObservable = subject.get_observable();
  simpleObservable.subscribe_on(eventLoopCoordination).subscribe([](int i)
  {
    std::cout << "SIMPLE OBSERVABLE Subs thread id : "
              << std::this_thread::get_id() << " and number: " << i << "\n";
  });

  auto lastBehValue = behsubject.get_value();
  auto behObservable = behsubject.get_observable();
  behObservable
    .filter([&lastBehValue](int i)
    {
        std::cout << "BEH OBVERVABLE Filter thread id : "
                << std::this_thread::get_id() << " and number: " << i << "\n";
        std::cout << "comparison "
                << ((i != lastBehValue) ? "different" : "identical")
                << std::endl;
        return i != lastBehValue;
    })
    .subscribe_on(immediateCoordination)
    .subscribe([](int i)
    {
        std::cout << "BEH OBVERVABLE Subs thread id : "
                << std::this_thread::get_id() << " and number: " << i << "\n";
    });

    auto simpleSubscriber = subject.get_subscriber();
    auto behSubscriber = behsubject.get_subscriber();

  auto value = 1;
  while (true)
  {
    lastBehValue = behsubject.get_value();
    simpleSubscriber.on_next(value);
    behSubscriber.on_next(value);
    value++;
    sleep(1);
  }

  return 0;
}

and compile with g++ sample.cpp -o sample -Irxcpp -lpthread

This is what happened, and we can see all of them happen in the same thread 139949595146048

Main thread id : 139949595146048 
BEH OBVERVABLE Filter thread id : 139949595146048 and number: 0
comparison identical
SIMPLE OBSERVABLE Subs thread id : 139949595146048 and number: 1
BEH OBVERVABLE Filter thread id : 139949595146048 and number: 1
comparison different
BEH OBVERVABLE Subs thread id : 139949595146048 and number: 1
SIMPLE OBSERVABLE Subs thread id : 139949595146048 and number: 2
BEH OBVERVABLE Filter thread id : 139949595146048 and number: 2
comparison different
BEH OBVERVABLE Subs thread id : 139949595146048 and number: 2
SIMPLE OBSERVABLE Subs thread id : 139949595146048 and number: 3
BEH OBVERVABLE Filter thread id : 139949595146048 and number: 3
comparison different
BEH OBVERVABLE Subs thread id : 139949595146048 and number: 3

What I expect is (just take an example), BEH in the primary thread, and SIMPLE in the background thread.

Main thread id : 1
BEH OBVERVABLE Filter thread id : 1 and number: 0
comparison identical
SIMPLE OBSERVABLE Subs thread id : 2 and number: 1
BEH OBVERVABLE Filter thread id : 1 and number: 1
comparison different
BEH OBVERVABLE Subs thread id : 1 and number: 1
SIMPLE OBSERVABLE Subs thread id : 2 and number: 2
BEH OBVERVABLE Filter thread id : 1 and number: 2
comparison different
BEH OBVERVABLE Subs thread id : 1 and number: 2
SIMPLE OBSERVABLE Subs thread id : 2 and number: 3
BEH OBVERVABLE Filter thread id : 1 and number: 3
comparison different
BEH OBVERVABLE Subs thread id : 1 and number: 3

I've tried to change the simpleObservable subscribe_on from observe_on_event_loop to observe_on_new_thread or synchronize_new_thread, but nothing changes.

Could anyone help to explain the results and suggest a way to work as expect? I also tried subscribing multiple simple observables in the same `eventLoopCoordination` object, but it does not help. Why?


Solution

  • Actually it is misunderstanding how subscribe_on works. Actually it makes subscription in selected scheduler, not emitting. So, underhood

    simpleObservable.subscribe_on(eventLoopCoordination).subscribe([](int i)
      {
        std::cout << "SIMPLE OBSERVABLE Subs thread id : "
                  << std::this_thread::get_id() << " and number: " << i << "\n";
      });
    

    is something like

    rxcpp::observable<>::create<int>([](const auto& sub) {
      eventLoopCoordination.schedule([&](...) {
          simpleObservable.subscribe(sub);
      });
    }).subscribe([](int i)
      {
        std::cout << "SIMPLE OBSERVABLE Subs thread id : "
                  << std::this_thread::get_id() << " and number: " << i << "\n";
      });
    
    

    What are you expecting is observe_on - it would schedule each emission to selected scheduler. Something like

    rxcpp::observable<>::create<int>([](const auto& sub) {
       simpleObservable.subscribe([](int v)
       {
          eventLoopCoordination.schedule([](...)
          {
             sub.on_next(v);
          });
       });
    }).subscribe([](int i)
      {
        std::cout << "SIMPLE OBSERVABLE Subs thread id : "
                  << std::this_thread::get_id() << " and number: " << i << "\n";
      });
    ;