I would like to create an RxJS pipeline that only switches to a new Observable (firing an API call) when the pipeline parameter has changed. However, I still want the most recent value re-emitted whenever a new parameter comes through.
So far, I've constructed the following pipeline that does more or less what I want:
const { fromEvent, scan, map, startWith, pairwise, switchMap, tap, of, shareReplay, delay } = rxjs;
const events = [1, 1, 1, 1, 1, 2, 2, 3, 4, 4, 4, 4];
let cache$;
fromEvent(document, 'click')
.pipe(
scan((count) => count + 1, -1),
map((count) => events[count % events.length]),
startWith(undefined),
pairwise(),
switchMap(([prev, curr]) => {
if (prev === curr) {
return cache$;
}
cache$ = of(curr).pipe(
tap((val) => console.log(`New subscription for ${val}`)),
delay(1000), // See edit to original question
shareReplay(1)
);
return cache$;
})
)
.subscribe((val) => console.log(`Received ${val}`));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>
<div>Click anywhere and watch the console output</div>
Note that a new inner pipeline is only created when a distinct/changed value is observed in the parent pipeline, but every value in the parent pipeline results in some output arriving at the subscribe
.
This code works decently, but I'm disappointed in the use of "external" state via the cache$
variable. I would much prefer a solution that keeps all the logic entirely within the pipeline, for readability and "purer" code.
Is it possible to achieve the result demonstrated above, without any external state tracking?
@akotech provided a great answer using switchScan
that answered the question almost perfectly. However, my original example above did not explicitly simulate the effect of a long-running inner pipeline (where the response from "the API" is delayed). When adding a delay
, the switchMap
above still produces the right response once it is received (no matter how many clicks occur), and caches it for any subsequent returns. Using switchScan
, however, runs the risk of losing the subscription to the long-running observable before it returns a value, leaving the cache in a stale or indeterminate state.
Is there a solution that produces consistent results, even with delayed inner-observable return values?
You could a scan
operator to cache the observable and later switch on it using switchAll
.
...,
scan(
(cached, [prev, curr]) =>
prev === curr
? cached
: of(`${curr}":Result`).pipe(
tap((val) => console.log(`New subscription for ${val}`)),
delay(1000),
shareReplay(1)
),
EMPTY
),
switchAll()
...
EDIT: originally proposed switchScan
but that option would only work for sync internal observables.