Search code examples
javascriptrxjsreactive-extensions-js

RxJs: access data before flatMapLatest after flatMapLatest finished


Scenario:

  1. User uses filters which are combined into single stream
  2. When filters change, event to the backend is fired to get "cheap" data
  3. When "cheap" data arrives, another request, with same parameters is fired to different endpoint, that returns "expensive" data which will be used to enrich cheap data. Request should be delayed by 1 second, and only fired if user does not change any of the filters (else it should wait for 1 second)

And i'm struggling with 3) option without intermediate variables.

let filterStream = Rx.Observable
.combineLatest(
  filterX,
  filterY,
  (filterX, filterY) => {
    x: filterX,
    y: filterY
  }
 )
 .map((filters) => {
  limit: 100,
  s: filters.x.a,
  f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()


let cheapDataStream = filterStream
.flatMapLatest((filterQuery) =>
Rx.Observable.fromPromise(cheapBackendApiCall(filterQuery)))

// render cheap results
cheapDataStream
.map(result => transformForDisplay(result))
.subscribe(result => { 
  //render
  // how do i invoke expensiveApiCall() with `filterQuery` data here?
  // with a delay, and only if filterQuery has not changed?

});

Solution

  • You can take advantage of implicit conversion to avoid explicitly using fromPromise everywhere. Then you could use concat to return first the cheap data immediately, followed by the expensive + cheap data with a delay. By nesting this in a flatMapLatest the stream will also cancel any pending expensiveCalls if a new query arrives.

    var filters = Rx.Observable
    .combineLatest(
      filterX,
      filterY,
      (filterX, filterY) => {
        x: filterX,
        y: filterY
      }
     )
     .map((filters) => {
      limit: 100,
      s: filters.x.a,
      f: filters.x.b + filters.y.c,
    })
    .distinctUntilChanged()
    .flatMapLatest(filters => {
      //This kicks off immediately
      var cheapPromise = cheapBackendApiCall(filters);
    
      //This was added in the latest version 4.1, the function is only called once it is subscribed to, 
      //if you are using earlier you will need to wrap it in a defer instead.
      var expensivePromiseFn = () => expensiveBackendApiCall(filters);
    
      //For join implicitly calls `fromPromise` so you can pass the same 
      // sort of arguments.
      var cheapAndExpensive = Rx.Observable.forkJoin(
                                cheapPromise, 
                                expensivePromiseFn, 
                                (cheap, expensive) => ({cheap, expensive}));
    
      //First return the cheap, then wait 1500 millis before subscribing 
      //which will trigger the expensive operation and join it with the result of the cheap one
      //The parent `flatMapLatest` guarantees that this cancels if a new event comes in
      return Rx.Observable.concat(cheap, cheapAndExpensive.delaySubscription(1500));
    })
    .subscribe(x => /*Render results*/);