Search code examples
javascriptrxjs

Can I return a cached Observable from switchMap without using state external to the RxJS pipeline?


I would like to create an RxJS pipeline that only switches to a new Observable (firing an API call) when the pipeline parameter has changed. However, I still want the most recent value re-emitted whenever a new parameter comes through.

So far, I've constructed the following pipeline that does more or less what I want:

const { fromEvent, scan, map, startWith, pairwise, switchMap, tap, of, shareReplay, delay } = rxjs;

const events = [1, 1, 1, 1, 1, 2, 2, 3, 4, 4, 4, 4];

let cache$;

fromEvent(document, 'click')
  .pipe(
    scan((count) => count + 1, -1),
    map((count) => events[count % events.length]),
    startWith(undefined),
    pairwise(),
    switchMap(([prev, curr]) => {
      if (prev === curr) {
        return cache$;
      }

      cache$ = of(curr).pipe(
        tap((val) => console.log(`New subscription for ${val}`)),
        delay(1000), // See edit to original question
        shareReplay(1)
      );

      return cache$;
    })
  )
  .subscribe((val) => console.log(`Received ${val}`));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>
<div>Click anywhere and watch the console output</div>

Note that a new inner pipeline is only created when a distinct/changed value is observed in the parent pipeline, but every value in the parent pipeline results in some output arriving at the subscribe.

This code works decently, but I'm disappointed in the use of "external" state via the cache$ variable. I would much prefer a solution that keeps all the logic entirely within the pipeline, for readability and "purer" code.

Is it possible to achieve the result demonstrated above, without any external state tracking?

Edit: Long-Running Inner Pipeline

@akotech provided a great answer using switchScan that answered the question almost perfectly. However, my original example above did not explicitly simulate the effect of a long-running inner pipeline (where the response from "the API" is delayed). When adding a delay, the switchMap above still produces the right response once it is received (no matter how many clicks occur), and caches it for any subsequent returns. Using switchScan, however, runs the risk of losing the subscription to the long-running observable before it returns a value, leaving the cache in a stale or indeterminate state.

Is there a solution that produces consistent results, even with delayed inner-observable return values?


Solution

  • You could a scan operator to cache the observable and later switch on it using switchAll.

     ...,
     scan(
       (cached, [prev, curr]) =>
         prev === curr
           ? cached
           : of(`${curr}":Result`).pipe(
               tap((val) => console.log(`New subscription for ${val}`)),
               delay(1000),
               shareReplay(1)
             ),
       EMPTY
     ),
     switchAll()
     ...
    

    EDIT: originally proposed switchScan but that option would only work for sync internal observables.