Search code examples
rxjsrxjs5

Maintaining Subject emission order when invoking next within subscription


I'm running into a bug and I've determined it's due to the fact that Subjects when next()ed will fire their events synchronously.

The following code produces the following ouput:

mySubject.subscribe(score => {
  if (score === 2) {
    mySubject.next(score + 10);
  }
})
mySubject.subscribe(score => console.log(score))

Ouput:
1
12
2

The only way Im able to get the proper output (1,2,12) is if I wrap my next() call in a setTimeout to make it async. Is there a proper way to deal with this issue that I'm missing?


Solution

  • If you're using RxJS 5.5 I'd personally use setTimeout as well. There's subscribeOn operator that you could use with the async scheduler (import { async } from 'rxjs/scheduler/async') to run every emission in a new frame but it's not available in RxJS 5.5 right now.

    So probably the easiest way is using delay(0) which doesn't make any delay and passes everything asynchronously like you did with setTimeout():

    import { Subject } from 'rxjs/Subject';
    import { delay } from 'rxjs/operators';
    
    const mySubject = new Subject();
    const source = mySubject.pipe(delay(0));
    
    source.subscribe(score => {
      if (score === 2) {
        mySubject.next(score + 10);
      }
    })
    source.subscribe(score => console.log(score));
    
    mySubject.next(1);
    mySubject.next(2);
    

    See live demo (open console): https://stackblitz.com/edit/typescript-fiwgrk?file=index.ts