Search code examples
javascripttypescriptpromiserxjsrequest-timed-out

Can we pipe two RxJS timout methods with same request?


I am trying to identify if my resource is failing due to timeouts and at the same time want to analyze how many failures are with timeouts. So, I am trying to attach one timeout with request which will simply log saying request not recieved within time, without throwing error and another timeout will throw the error. Something like this:

const checkResponseTimeouts = async () => {
  const data = await firstValueFrom(
    from(getData())
      .pipe(
        timeout({
          each: 2500,
          with: ( sub ) => { // this is not the way it works, but just to make example
            console.log("response not recieved within 2.5 seconds");
            return true;
          },
        })
      )
      .pipe(
        timeout({
          each: 10000,
          with: () => {
            console.log("request timed out after 10 seconds");
            return throwError(() => new Error("Some error"));
          },
        })
      )
  );

  return data;
};

The code above is not correct, its just for demonstrating the problem. I am new to RxJS and few things seems confusing to me. Help will be much appreciated.


Solution

  • You could create a custom operator for this, that would be generic and therefore reusable.

    const sideEffectAfterXMs =
      (ms: number, sideEffect: () => void) =>
      <T>(obs$: Observable<T>) =>
        race(
          obs$,
          new Observable((observer) => {
            const timeout = setTimeout(sideEffect, ms);
            return () => clearTimeout(timeout);
          })
        );
    

    We create a race between the original observable and one that we create ourselves. The one we create ourselves never emits, which ensures that the race can only be won by the original observable. But within our observable, we call a callback after a certain amount of time. Most importantly, we cleanup the timeout if the observable is closed, which could only happen if the original observable emits first or throw.

    Then to use it:

    of('Hello world!')
      .pipe(
        delay(11_000),
        sideEffectAfterXMs(2500, () => console.log('Side effect!')),
        timeout(5_000)
      )
      .subscribe(
        (x) => console.log(`Value received: ${x}`),
        (e) => console.log(`Error: ${e}`),
        () => console.log(`Complete`)
      );
    

    If we apply a short delay, we get this output:

    Value received: Hello world!
    Complete
    

    If we apply a delay that is above the 2500 ms timeout for the side effect but below the 5000 timeout to throw, we get this output:

    Side effect!
    Value received: Hello world!
    Complete
    

    And if we apply a delay that is greater than the 5000 ms timeout, we get this output:

    Side effect!
    Error: TimeoutError: Timeout has occurred
    

    Here's a live demo on Stackblitz that you can play with.