Search code examples
javascriptrxjsobservablerxjs5reactive

RxJs: How to only maintain the latest value until inner observable complete


I'm new to RxJs and having trouble to achieve this in "RxJs way":

An infinite stream a$ emits a value a once a while.

async() takes the a and performs an async operation.

If a$ emits values while async is pending, only keep the latest one al.

After the previous async completes, if there is an al, run async(al).

And so on.

a$:----a1----------a2----a3-----------------------a4-----------
       async(a1):------------end                  async(a4):---
                             async(a3):-----end

Here is what I came up with, a bit nasty:

var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$

function async (val) {
  async$ = asyncRunning$
  // do something with val
  console.log(val + ' handling')
  setTimeout(() => {
    console.log(val + ' complete')
    async$.next()
    async$ = asyncIdle$
  }, 2000)
}

// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.mapTo(1)
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))


a$.debounce(() => async$)
.subscribe(val => {
  async(val)
})

Solution

  • You can use the audit operator to solve the problem, like this (the comments should explain how it works):

    // Simulate the source.
    
    const source = Rx.Observable.merge(
      Rx.Observable.of(1).delay(0),
      Rx.Observable.of(2).delay(10),
      Rx.Observable.of(3).delay(20),
      Rx.Observable.of(4).delay(150),
      Rx.Observable.of(5).delay(300)
    ).do(value => console.log("source", value));
    
    // Simulate the async task.
    
    function asyncTask(value) {
      return Rx.Observable
        .of(value)
        .do(value => console.log(" before async", value))
        .delay(100)
        .do(value => console.log(" after async", value));
    }
    
    // Compose an observable that's based on the source.
    // Use audit to ensure a value is not emitted until
    // the async task has been performed.
    // Use share so that the signal does not effect a
    // second subscription to the source.
    
    let signal;
    
    const audited = source
      .audit(() => signal)
      .mergeMap(value => asyncTask(value))
      .share();
    
    // Compose a signal from the audited observable to
    // which the async task is applied.
    // Use startWith so that the first emitted value
    // passes the audit.
    
    signal = audited
      .mapTo(true)
      .startWith(true);
    
    audited.subscribe(value => console.log("output", value));
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>