Search code examples
javascriptfunctional-programmingreactive-programmingrxjsreactive-extensions-js

Ensure order that subscribers get updated


Is there a way to make sure the order on how subscribers get updated is ensured?

I've got a hot observable and my first subscriber does some sync work to update a variable and my next subscriber then has to initialise a service (only once!), and only after that variable is ensured to be set!

it looks like this:

import App from './App'

var appSource = App.init() // gets the hot observable

// our second subscriber
appSource.take(1).subscribe(() => {
  // take 1 to only run this once
  nextService.init()
})

where App.init looks like this:

...
init() {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => this.myVar = data)

  return source
}
...

this currently works, but I am unsure if it will always follow the order 100%.

EDIT:

As I've heard, subscribers will be invoked FIFO. So the order is somewhat assured.


Solution

  • I don't know if RxJS ever explicitly guarantees that observers are called in order of subscription. But, as you say, it usually works.

    However, you might consider modelling your actual workflow instead of relying on implicit observer order.

    It sounds like you need to know when your app is initialized so you can take further action. Instead of relying on knowledge of the internal workings of App.init, App could expose an API for this:

    One (non-Rx way) is to let the caller supply a callback to init:

    //...
    init(callback) {
      var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes
    
      // first subscriber, updates the `myVar` every few minutes
      source.subscribe((data) => {
        this.myVar = data;
        if (callback) {
            callback();
            callback = undefined;
        }
      })
    
      return source
    }
    
    // elsewhere
    App.init(() => nextService.init());
    

    Another option instead of a callback is to just have init return a Promise that your resolve (or an Rx.AsyncSubject that you signal) once initialization is complete.

    And yet another option, but requires a bit of a refactor, is to model this.myVar as the observable data that it is. i.e.:

    init() {
        this.myVar = this.createObservable().replay(1);
        this.myVar.connect();
        // returns an observable that signals when we are initialized
        return this.myVar.first();
    }
    
    // elsewhere, you end up with this pattern...
    const servicesToInit = [ App, service1, service2, service3 ];
    Observable
        .of(servicesToInit)
        .concatMap(s => Rx.Observable.defer(() => s.init()))
        .toArray()
        .subscribe(results => {
            // all initializations complete
            // results is an array containing the value returned by each service's init observable
        });
    

    Now, anything that wants to make use of myVar would always need to subscribe to it in someway to get the current and/or future values. They could never just synchronously ask for the current value.