Search code examples
rxjsrxjs6rxjs-observables

Read values from one observable but switch to another once it (the other one) emits


My desired behaviour:

  1. Run HTTP request
  2. Immediately look up data in async cache
  3. If cache has the value before HTTP emits - use cache value.
  4. Use HTTP value after it's finally here.
  5. If HTTP responds faster than cache - ignore cache.

So basically I would like to kick off two async processes, one of which is supposed to provide a value quickly but if it doesn't - I want to only use the value from a slower observable which takes precedence anyway.


Solution

  • To expand from my comments: the question is to trigger two observables in parallel and utilize the first emission even if the other observable hasn't emitted yet.

    Normally you could use the merge function for it.

    However you have a condition ("If HTTP responds faster than cache - ignore cache.") that is not natively fulfilled by the merge function nor by any standard RxJS operators.

    But it is easy to write custom operators in RxJS from existing operators. For your case you could customize the filter operator to suit your needs. See here for a brief intro on how to write a custom operator.

    export const filterLateCache = () => {
      let serverEmitted = false;
      return <T>(source: Observable<T>) => {
        return source.pipe(
          filter((data: any) => {
            if (!!data.server) {
              serverEmitted = true;
              return true;
            } else if (!!data.cache) {
              if (serverEmitted) {
                return false;
              } else {
                return true;
              }
            } else {
              return false;
            }
          })
        );
      };
    };
    

    As you can see the boolean flags server and cache in the incoming notification are checked to decide whether the value must be emitted. So you'd need to append the values from the observables with these flags using the map operator.

    merge(
      server$.pipe(
        map((value) => ({
          server: true,
          value: value,
        }))
      ),
      cache$.pipe(
        map((value) => ({
          cache: true,
          value: value,
        }))
      )
    )
      .pipe(filterLateCache())
      .subscribe({
        next: ({ value }) => {       // <-- utilize destructuring to ignore boolean flags
          // handle response here
        },
        error: (error: any) => {
          // handle errors
        }
      });
    

    Working example: Stackblitz