Search code examples
javascriptrxjsreactivex

RxJS reduce a ReplaySubject


I'm using ReactiveX/RxJS version.

Lets say that I have a Rx.ReplaySubject that every 2 seconds it emits an object that contains an id and an array with values. I want to reduce this array of values and get the sum of them all.

The problem is that ReplaySubject is a hot observable and it never completes, at least I don't want it to complete because I want the sum of that object values every 2 seconds. But in order to use the reduce operator the observable should be completed. So, how should I proceed ?

E.G not working code:

var subject = new Rx.ReplaySubject();

subject.
  map(x => x.transactions).
  // Reduce never concludes because ReplaySubject instance is not completed
  reduce((v1, v2) => v1+v2, 0).
  subscribe(function (value) {
    console.log(value)
  });

setInterval(injectData, 2000);

function injectData () {
  subject.next({id: Date.now(), transactions: [
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)}
  ]});
}

Solution

  • Consider using Observable.prototype.scan() (RxJS documentation)

    scan() bascially aggregates an observable and emits each successive value, unlike reduce() which only emits the result upon completion. (see Rx explanation of scan and reduce)

    Example using OP's code (Here's the fiddle):

    var subject = new Rx.ReplaySubject();
    
    subject
      // note: use "selectMany" to flatten observable of observables
      .selectMany(x => Rx.Observable.fromArray(x.transactions))
      // note: use "scan" to aggregate values
      .scan((agg, val) => agg+val.value, 0)
      .subscribe(function (value) {
        console.log(value)
      });
    
    setInterval(injectData, 2000);
    
    function injectData () {
      subject.onNext({id: Date.now(), transactions: [
        {value: Math.round(Math.random() * 5000)},
        {value: Math.round(Math.random() * 5000)},
        {value: Math.round(Math.random() * 5000)},
        {value: Math.round(Math.random() * 5000)},
        {value: Math.round(Math.random() * 5000)}
      ]});
    }
    

    Another example:

    The above code emits the aggregate for every transaction because of the selectMany(). If you only wanted it to emit once every 2 seconds, this is a great time to use reduce() like so (Here's another fiddle):

    subject
      // note: use "selectMany" to flatten observable of observables
      // note: using "reduce" inside here so that we only emit the aggregate
      .selectMany(x => 
        Rx.Observable
          .fromArray(x.transactions)
          .reduce((agg, val) => agg + val.value, 0)
      )
      // note: use "scan" to aggregate values
      .scan((agg, val) => agg+val, 0)
      .subscribe(function (value) {
        console.log(value)
      });
    

    Additional note:

    Rx Subjects can complete; you just need to call onCompleted() when you're ready. If you make your subject complete you can still use reduce(). Compare this fiddle with the one above.