Search code examples
javascriptrxjsobserver-patternobservers

Correct way to rxjs observe with timeout


I need to get an object from ReplaySubject, and should throw an error, if no object will occur in 5 seconds.

Do some code below (and it's working), but I'm willing to find more elegant solution.

const replaySubject = new ReplaySubject(1);

function objGet() {
   return replaySubject;
}

function objGetWithTimeout() {
    const timeout = 5000;

    let observed = false;
    const objObserver = objGet();

    objObserver
      .pipe(first())
      .subscribe(() => observed = true);

    return race(
      objObserver,
      timer(timeout)
        .pipe(map(() => {
            if (!observed) {
              throw new Error('timeout');
            }
          })
        )
    )
  }

function called in that way:

objGetWithTimeout()
   .pipe(catchError((err) => console.error('timeout') && throwError(err)))
   .subscribe((data) => console.log('obj received', data));

Solution

  • You can use timeoutWith() operator:

    objObserver
      .pipe(
        first(),
        timeoutWith(timeout, throwError(new Error('timeout'))),
      )