Search code examples
javascriptrxjsrxjs-observables

Getting last two values emitted from a chained Observable


I have two Observables that I am trying to chain together. I need to subscribe to both and act on the emitted values for both. I feel like I'm doing something wrong in my setup and wanted to ask if this is the best way to accomplish this.

Essentially I have one Observable that is emitting values from a stream. In the second value, I want the last two values emitted from the first Observable. This is because if the first Observable throws an error, I need to know what the response was right before the error was thrown.

In addition, I want these to execute in order, so if the first observable emits:

[0, 1, 2, <error>]

My console should read:

0
1
2
Notification[2] (where the first element is 2 and the second is the error notification

Here is what I have:

const first = interval(1000).pipe(take(5));

source.subscribe({
  next(response) {
    console.log("First: ", response);

    // Process the response
    // .......
  }
});


// Create a second Observable from the first that only emits the last two messages
// from the first Observable
const second = first.pipe(materialize(), bufferCount(2), last());

second.subscribe({
  next(response) {
    console.log("Second: ", response);
  }
});

Is this the easiest way to subscribe to two chained Observables? I'm assuming I could use a flatMap or concat here somewhere, but I have two Observables of different types, so I'm not sure how that would work. I think I'm most bothered by how I have two subscriptions here to related observables.


Solution

  • An RxJS#Observable has 3 types of emissions.

    1. next : An observable can have any number of next emissions. This means an observable with 0 emissions is completely fine. The emission itself comes with a value. If you're using TypeScript, the type of this value is encoded in the type of the Observable. A stream that emits a number:
      const stream: Observable<number> = of(42);
    1. complete : An observable can only ever send 1 complete emission. After this emission the observable is done and will never emit anything else again. The emission itself doesn't have a value.

    2. error : This is the exact same as complete, only it does have a value. Typically the value is an error message or object (hence the name of this emission). If you're using TypeScript, the type of this value isn't encoded in the type of the Observable (Similar to how a function throws an error). Remember that (like complete) after this emission the observable is done and will never emit anything else ever again.


    Remembering the last value before an error

    There are many approaches to this. Consider this:

    // Emit next once per sceond
    const a$ = interval(1000);
    // Never emits a next, emits an error after 5.5seconds
    const b$ = timer(5500).pipe(
      tap(_ => {throw "This is an error"})
    );
    
    merge(a$, b$).subscribe({
      next: value => console.log("next: ", value),
      complete: () => console.log("terminal emission (complete)"),
      error: e => console.log("terminal emission (error): ", e)
    });
    

    Output:

    next:  0
    next:  1
    next:  2
    next:  3
    next:  4
    terminal emission (error):  This is an error
    

    An error is a terminal emission just like complete, but most operators act in such a way that they error themselves with an error emission arrives. So takeLast(2) waits for a complete emission and will error itself if it receives an error emission.

    You can turn an error into another stream using catchError. If you choose a stream that completes rather than errors you can get back into more usual behaviour.

    Solution I've created a custom operator that does what you've described (I think). It leaves the source observable untouched unless it throws an error. It rethrows the same error, but with a previousValue added in. Here it is in use:

    // Custom operator that will embelish any error thrown with the last
    // cached value emitted. lastValue = null if error is thrown before
    // any values are emitted.
    function embelishErrors<T>(): MonoTypeOperatorFunction<T> {
      return s => defer(() => {
        let cache: T | null = null;
        return s.pipe(
          tap(v => cache = v),
          catchError(e => throwError(() => ({
            lastValue: cache, 
            error: e
          })))
        );
      })
    }
    
    // Emit next once per sceond
    const a$ = interval(1000);
    // Never emits a next, emits an error after 5.5seconds
    const b$ = timer(5500).pipe(
      tap(_ => {throw "This is an error"})
    );
    
    merge(a$, b$).pipe(
      embelishErrors()
    ).subscribe({
      next: value => console.log("next: ", value),
      complete: () => console.log("terminal emission (complete)"),
      error: e => console.log("terminal emission (error): ", e)
    });
    

    Output:

    next:  0
    next:  1
    next:  2
    next:  3
    next:  4
    terminal emission (error):  { lastValue: 4, error: 'This is an error' }