Search code examples
node.jswebsocketrxjstimeoutobservable

Detect when an observable has emitted the same value twice in a row


I'm using a keep-alive mechanism to my websocket connection with the method from this answer:

Observable.timer(0, 5000)
  .map(i => 'ping')
  .concatMap(val => {
    return Observable.race(
      Observable.of('timeout').delay(3000),
      sendMockPing()
    );
  })

If a timeout happens I need to completely reset the websocket connection (as it likely means the connection is broken), but sometimes a single timeout can just happen randomly (due to a poor server implementation I guess?)

My subscription logic is currently implemented like this

).subscribe(result => {
  if (result === 'timeout') {
    // Reconnect to server
  }
}

Is there any way (preferably using RxJs) to map the observable in a way so I can recognize the case where it has emitted 'timeout' twice in a row?


Solution

  • You could use the scan operator to do what you want:

    source.pipe(
      scan((previous, next) => {
        if ((previous === 'timeout') && (next === 'timeout')) {
          throw new Error('Two consecutive timeouts occurred.');
        }
        return next;
      }, undefined);
    );