Search code examples
rxjsreactive-programmingrxjs6

rxjs async updates to Observable


What's the best way to handle asynchronous updates in the middle of an Observable stream.

Let's say there are 3 observables:

Obs1 (gets data from API) -> pipes to Obs2 Obs2 (transforms data) -> pipes to Obs3 Obs3 (sends transformed data)

(The actual application is more complex, and there's reasons it's not done in a single Observable, this is just a simple example).

That all works well and good if it's a linear synchronous path.

But we also have async messages that will change the output of Obs2.

3 scenarios I'm asking about are: - we fetch data, and go through Obs1, Obs2 & Obs3 - we get a message to make a change, go through Obs2 & Obs3 - we get a different message to make a change which also needs to apply the change from the previous message, through Obs2 & Obs3

The main problem here is that there are different types of asynchronous messages that will change the outcome of Obs2, but they all need to still know what the previous outcome of Obs2 was (so the any other changes from messages that happened before is still applied)

I have tried using switchMap in Obs2 with a scan in Obs1 like this:

obs1

const obs1$ = obs1$.pipe(
  // this returns a function used in the reducer.
  map((data) => (prevData) => 'modifiedData',
  scan((data, reducer) => reducer(betsMap), {})
)

obs2

const obs2$ = obs1$.pipe(
  switchMap(data =>
    someChange$.pipe(map(reducer => reducer(data)))
  )
)

where someChange$ is a BehaviorSubject applying a change using another reducer function.

This works fine for async message #1 that makes some change. But when message #2 comes in and a different change is needed, the first change is lost.

the changes that should be in "prevData" in obs1$ is always undefined because it happens before the message is applied.

How can I take the output from obs2$ and apply asynchronous updates to it that remembers what all of the past updates was? (in a way where I can clear all changes if needed)


Solution

  • So if i got the question right, there are two problems that this question tackles:

    First: How to cache the last 2 emitted values from stream.

    scan definitely is the right way, if this cache logic is needed in more than one place/file, I would go for a custom pipe operator, like the following one

    function cachePipe() {
      return sourceObservable =>
        sourceObservable.pipe(
          scan((acc, cur) => {
            return acc.length === 2 ? [...acc.slice(1), cur] : [...acc, cur];
          }, [])
        );
    }
    

    cachePipe will always return the latest 2 values passed trough the stream.

    ...
    .pipe(
          cachePipe()
         )
    

    Second: How to access data from multiple streams at the same time, upon stream event

    Here rxjs's combineLatest creation operator might do the trick for you,

    combineLatest(API$, async1$ ,async2$,async3$)
      .pipe(
        // here I have access to an array of the last emitted value of all streams
        // and the data can be passed to `Obs2` in your case
       )
    

    In the pipe I can chain whatever number of observables, which resolves the second problem.

    Note:

    combineLatest needs for all streams, inside of it, to emit once, before the operator strats to emit their combined value, one workaround is to use startWith operator with your input streams, another way to do it is by passing the data trough BehaviorSubject-s.

    Here is a demo at CodeSandbox , that uses the cachePipe() and startWith strategy to combine the source (Obs1) with the async observables that will change the data.