newbie to RxCpp
.
I found that a subscriber
created by make_subscriber
and subscribe on a observable::create
can observe/subscribe on the new thread, but not with the case by them getting from a subject.
I want to let the simple subject work in the primary thread and the behavior subject in the background thread. The demo codes are as follows:
#include <rxcpp/rx.hpp>
#include <unistd.h>
int main()
{
std::cout << "Main thread id : "
<< std::this_thread::get_id() << "\n";
auto eventLoopCoordination =
rxcpp::observe_on_event_loop(); // place emission to the queue and process
// them in the thread pool under the hood
auto immediateCoordination =
rxcpp::identity_immediate(); // emit the item immediately with no
// scheduler at all
rxcpp::subjects::subject<int> subject;
rxcpp::subjects::behavior<int> behsubject(0);
auto simpleObservable = subject.get_observable();
simpleObservable.subscribe_on(eventLoopCoordination).subscribe([](int i)
{
std::cout << "SIMPLE OBSERVABLE Subs thread id : "
<< std::this_thread::get_id() << " and number: " << i << "\n";
});
auto lastBehValue = behsubject.get_value();
auto behObservable = behsubject.get_observable();
behObservable
.filter([&lastBehValue](int i)
{
std::cout << "BEH OBVERVABLE Filter thread id : "
<< std::this_thread::get_id() << " and number: " << i << "\n";
std::cout << "comparison "
<< ((i != lastBehValue) ? "different" : "identical")
<< std::endl;
return i != lastBehValue;
})
.subscribe_on(immediateCoordination)
.subscribe([](int i)
{
std::cout << "BEH OBVERVABLE Subs thread id : "
<< std::this_thread::get_id() << " and number: " << i << "\n";
});
auto simpleSubscriber = subject.get_subscriber();
auto behSubscriber = behsubject.get_subscriber();
auto value = 1;
while (true)
{
lastBehValue = behsubject.get_value();
simpleSubscriber.on_next(value);
behSubscriber.on_next(value);
value++;
sleep(1);
}
return 0;
}
and compile with g++ sample.cpp -o sample -Irxcpp -lpthread
This is what happened, and we can see all of them happen in the same thread 139949595146048
Main thread id : 139949595146048
BEH OBVERVABLE Filter thread id : 139949595146048 and number: 0
comparison identical
SIMPLE OBSERVABLE Subs thread id : 139949595146048 and number: 1
BEH OBVERVABLE Filter thread id : 139949595146048 and number: 1
comparison different
BEH OBVERVABLE Subs thread id : 139949595146048 and number: 1
SIMPLE OBSERVABLE Subs thread id : 139949595146048 and number: 2
BEH OBVERVABLE Filter thread id : 139949595146048 and number: 2
comparison different
BEH OBVERVABLE Subs thread id : 139949595146048 and number: 2
SIMPLE OBSERVABLE Subs thread id : 139949595146048 and number: 3
BEH OBVERVABLE Filter thread id : 139949595146048 and number: 3
comparison different
BEH OBVERVABLE Subs thread id : 139949595146048 and number: 3
What I expect is (just take an example), BEH in the primary thread, and SIMPLE in the background thread.
Main thread id : 1
BEH OBVERVABLE Filter thread id : 1 and number: 0
comparison identical
SIMPLE OBSERVABLE Subs thread id : 2 and number: 1
BEH OBVERVABLE Filter thread id : 1 and number: 1
comparison different
BEH OBVERVABLE Subs thread id : 1 and number: 1
SIMPLE OBSERVABLE Subs thread id : 2 and number: 2
BEH OBVERVABLE Filter thread id : 1 and number: 2
comparison different
BEH OBVERVABLE Subs thread id : 1 and number: 2
SIMPLE OBSERVABLE Subs thread id : 2 and number: 3
BEH OBVERVABLE Filter thread id : 1 and number: 3
comparison different
BEH OBVERVABLE Subs thread id : 1 and number: 3
I've tried to change the simpleObservable
subscribe_on
from observe_on_event_loop
to observe_on_new_thread
or synchronize_new_thread
, but nothing changes.
Could anyone help to explain the results and suggest a way to work as expect? I also tried subscribing multiple simple observables in the same `eventLoopCoordination` object, but it does not help. Why?
Actually it is misunderstanding how subscribe_on
works. Actually it makes subscription in selected scheduler, not emitting. So, underhood
simpleObservable.subscribe_on(eventLoopCoordination).subscribe([](int i)
{
std::cout << "SIMPLE OBSERVABLE Subs thread id : "
<< std::this_thread::get_id() << " and number: " << i << "\n";
});
is something like
rxcpp::observable<>::create<int>([](const auto& sub) {
eventLoopCoordination.schedule([&](...) {
simpleObservable.subscribe(sub);
});
}).subscribe([](int i)
{
std::cout << "SIMPLE OBSERVABLE Subs thread id : "
<< std::this_thread::get_id() << " and number: " << i << "\n";
});
What are you expecting is observe_on
- it would schedule each emission to selected scheduler. Something like
rxcpp::observable<>::create<int>([](const auto& sub) {
simpleObservable.subscribe([](int v)
{
eventLoopCoordination.schedule([](...)
{
sub.on_next(v);
});
});
}).subscribe([](int i)
{
std::cout << "SIMPLE OBSERVABLE Subs thread id : "
<< std::this_thread::get_id() << " and number: " << i << "\n";
});
;