I'm trying to understand how to use rxcpp, my impression was that when an observable emits a value, all observers who are subscribed will get notified by having their their on_next() methods called, passing them the emitted value.
This is not the case with the following example:
auto eventloop = rxcpp::observe_on_event_loop();
printf("Start task\n");
auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
[](int i){
printf("Observable sending: %d\n", i);
return i;
}
);
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#1 onNext: %d\n", v);},
[](){printf("#1 onCompleted\n");});
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#2 onNext: %d\n", v);},
[](){printf("#2 onCompleted\n");});
printf("Finish task\n");
I expected the output to be something like:
Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task
i.e. on_next being called on all subscribed observers when the new value comes through.
Instead, the output is actually:
Start task
Observable sending: 1
#1 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
Observable sending: 1
#2 onNext: 1
Observable sending: 2
#2 onNext: 2
#2 onCompleted
Finish task
This is the classic hot vs. cold behavior.
A hot observable will do as you expect. Interval is a cold observable so each subscribe produces an independent set of values.
The publish operator will take a single cold observable and share it as a hot observable.
In this case it would be.
auto sharedvalues = values.publish().ref_count();
Then use sharedvalues
instead of values
in the subscribe expressions.
A search for hot vs cold observables will find extensive discussion of this topic.