I know this is a known behavior as desribed here
basic problem is described in the link above, but here is the relevant code (taken from link)
// This is going to behave strangely
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
an error in a subscription will terminate the whole source stream.
the solution for an Observable is to use .observeOn(Rx.Scheduler.asap);
I'm fairly new to the whole reactive programming and I struggle to apply this solution to my Subject
because subject doesn't support observeOn
.
But I need a Subject
because I need to push new values to the steam.
How can I workaround this problem or use observeOn
with a Subject
?
observeOn
returns an Observable
. But I struggle how to combine observeOn
with my Subject
.
how to use observeOn and still be able to push values to my subject?
here's the current code (simplified)
export class MyClass{
private messages: Subject<Message> = new Subject<Message>();
dispatchMessage(message: Message) {
this.messages.next(message);
}
Ideas?
P.S.
for anybody using angular (like me), observeOn might have some undesired side effects. https://github.com/angular/angular/issues/14316
just as additional information for anybody who comes to this question.
In that case you just need to a separate reference to the Subject
as well as a reference to the chain after you append the observeOn
operator:
const subject$ = new Subject();
const obs$ = subject$.observeOn(...);
obs$.subscribe(...);
subject$.next(...);