Search code examples
cachingrxjsrxjs-pipeable-operators

Cache Http requests using only RxJS operators


I'm trying to achieve what is described here: https://www.prestonlamb.com/blog/rxjs-cache-and-refresh-in-angular

In other words, I want to cache an observable, during a given time (let's say 1minute). When a subscription is made after that given time, data should be retrieved again and cache again for 1 minute.

Example of the expected result:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data

The shareReplay operator is working fine to cache data for a given time, but I'm not able to re-launch it when that given time is elapsed.

Example using shareRelay(1, 1000) operator:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => no response
T 01:15: Request (6) => no response
T 01:30: Request (7) => no response
T 02:30: Request (8) => no response

The link above try to change that behavior using first operator catching null results. Unfortunately, it's not working fine as data is not cached after the first time.

Here's what I've got using the article of the above link (following picture is describing the code used)

code details

Result I've got:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => RETRIEVE data
T 01:30: Request (7) => RETRIEVE data
T 02:30: Request (8) => RETRIEVE data

I've also seen some example with the timer operator, but in that cases, data is retrieved every minute, even if there is no subscribtion on it. I do not want to refresh data every minute, I want to expire cache every minute. Unfortunately, I've lost the code with the timer operator, but the result was something like that:

Result with timer operator:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:00: NO REQUEST => RETRIEVE data
T 01:10: Request (5) => data from cache
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:00: NO REQUEST => RETRIEVE data
T 02:30: Request (8) => data from cache

Anyone with a "pure" RxJS solution to do what I want?


Solution

  • I think that solution you provided a link for has a small bug, as I tried to highlight in this StackBlitz. (or I might have misunderstood the idea)

    You could try this:

    const refetchSbj = new Subject();
    const refetchData$ = refetchSbj.pipe(
        switchMap(() => service.fetchData())
      ).pipe(share());
    
    merge(
      src$,
      refetchData$
    ).pipe(
      shareReplay(1, 1000),
      buffer(concat(timer(0), refetchData$)),
      tap(values => !values.length && refetchSbj.next()),
      filter(values => values.length !== 0),
      // In case there is only one value,
      map(([v]) => v),
      // Might want to add this, because each subscriber will receive the value emitted by the `shareReplay`
      take(1)
    )
    

    shareReplay internally uses a ReplaySubject, which emits all the cached values synchronously to a new subscriber. timer(0) is similar to setTimeout(fn, 0), but the important aspect here is that it's asynchronous, which allows buffer to collect the values emitted by the ReplaySubject.

    buffer(concat(timer(0), refetchData$)), - we want to make sure that the inner observable provided to buffer does not complete, otherwise the entire stream would complete. refetchData$ will emit the newly fetched data when it's the case(we'll see when a bit later).

    tap(values => !values.length && refetchSbj.next()) - if no values were emitted, it means that the ReplaySubject in use does not have any values, which means that time has passed. If that's the case, with the help of refetchSbj, we can repopulate the cache.


    So this is how we could visualize the flow:

    T 00:00: Request (1) => RETRIEVE data
    1) `refetchSbj.next()`
    2) shareReplay will send the value resulted from `service.fetchData()` to the subscriber
    3) the newly fetched value will be added to the `buffer`, and then the `refetchData$` from `concat(timer(0), refetchData$)` will emit(this is why we've used `share()`), meaning that `values` will not be an empty array
    4) take(1) is reached, the value will be sent to the subscriber and then it will complete, so the `ReplaySubject` from `shareReplay()` will have no subscribers.
    
    T 00:10: Request (2) => data from cache
    `values` will not be empty, so `refetchSbj` won't emit and `take(1)` will be reached
    
    T 00:35: Request (3) => data from cache
    T 00:50: Request (4) => data from cache
    T 01:10: Request (5) => RETRIEVE data
    Same as `Request (1)`
    T 01:15: Request (6) => data from cache
    T 01:30: Request (7) => data from cache
    T 02:30: Request (8) => RETRIEVE data