Search code examples
javascriptrxjsrxjs5

Make potentially asynchronous RxJS observable certainly asynchronous


There is potentially asynchronous observable, i.e. something like:

const potentiallyButNotNecessarilyAsyncObservable = [
  Observable.of('sync'),
  Observable.of('async').delay(100)
][Math.round(Math.random())];

potentiallyButNotNecessarilyAsyncObservable.subscribe(console.log);

It should be made asynchronous observable. It's desirable to not delay it any further if it's already asynchronous, so I can't do potentiallyButNotNecessarilyAsyncObservable.delay(0).

How can this be done?


Solution

  • You can control the way how work will be scheduled and executed by using Schedulers. For example, you can write .observeOn(Rx.Scheduler.async) and work will be scheduled using setTimeout or setInterval instead of synchronous recursive calls. This is advanced topic and if you would like to know it better I recomend you to read this great documentation on Schedulers

    Here are the two examples, first is executed in one tick and the second in several:

    const { of, asapScheduler } = rxjs; // = require("rxjs")
    const { observeOn } = rxjs.operators; // = require("rxjs/operators")
    
    const sync$ = of('sync');
    
    // by default RxJS will pick a default scheduler by using the principle of least concurrency
    // in this case it is a null or undefined Schedulr which means notifications are delivered synchronously and recursively
    console.log('--------- default(null) Scheduler ---------------');
    console.log('just before subscribe');
    sync$.subscribe({
      next: x => console.log('got value ' + x),
      error: err => console.error('something wrong occurred: ' + err),
      complete: () => console.log('done'),
    });
    console.log('just after subscribe');
    
    
    const async$ = sync$.pipe(
      observeOn(asapScheduler)
    );
    
    // "got value sync" is printed after "just after subscribe"
    console.log('--------- Rx.Scheduler.async Scheduler ----------');
    console.log('just before subscribe');
    async$.subscribe({
      next: x => console.log('got value ' + x),
      error: err => console.error('something wrong occurred: ' + err),
      complete: () => console.log('done'),
    });
    console.log('just after subscribe');
    <script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>