Search code examples
typescriptrxjssubject

rxJs Subject, error in subscriptions kills the whole stream


I know this is a known behavior as desribed here

basic problem is described in the link above, but here is the relevant code (taken from link)

// This is going to behave strangely
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
  if (x === 1) {
    throw new Error('oops');
  }
  return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"

an error in a subscription will terminate the whole source stream.

the solution for an Observable is to use .observeOn(Rx.Scheduler.asap);

I'm fairly new to the whole reactive programming and I struggle to apply this solution to my Subject because subject doesn't support observeOn.

But I need a Subject because I need to push new values to the steam.

How can I workaround this problem or use observeOn with a Subject?

observeOn returns an Observable. But I struggle how to combine observeOn with my Subject. how to use observeOn and still be able to push values to my subject?

here's the current code (simplified)

export class MyClass{

    private messages: Subject<Message> = new Subject<Message>();

    dispatchMessage(message: Message) {

     this.messages.next(message);
}

Ideas?

P.S.

for anybody using angular (like me), observeOn might have some undesired side effects. https://github.com/angular/angular/issues/14316

just as additional information for anybody who comes to this question.


Solution

  • In that case you just need to a separate reference to the Subject as well as a reference to the chain after you append the observeOn operator:

    const subject$ = new Subject();
    const obs$ = subject$.observeOn(...);
    
    obs$.subscribe(...);
    subject$.next(...);