Search code examples
angularrxjsobservable

RxJS polling + retry using a backoff strategy


I've been trying to implement my own RxJS operator, so that I can "plug it" to my Angular's HttpClient. I'd like to keep the request ongoing (polling) every X milliseconds, but also in case of errors I'd like to retry the requests using some sort of incremental strategy. Here's a breakdown:

  1. Repeat the request over and over again every X milliseconds (polling)
  2. In case of error, the request shall be retry every Y milliseconds, but if the request fails again, the milliseconds should double (e.g. first fail 1000ms, second 2000ms, third 4000ms) up until a limit, if this limit is reached, for example 1 minute, then the retry will not increase again and all subsequent retries should wait 1 minute.
  3. After a succesful retrying, the retry ms counter should restart, so that the next time it will retry again from Y milliseconds.
  4. When performing a request that for whatever reason takes a lot of time to respond, we need to ensure that no other request is fired nor cancelled (I can see a possible use of concatMap here!)

Here's what I have so far:

function repeatWithBackoff<T>(delay: number, maxDelay = 60000) {
  return (source: Observable<T>) =>
    timer(0, delay).pipe(
      concatMap(() => {
        return source.pipe(
          retryWhen((attempts) => {
            return attempts.pipe(
              concatMap((attempt, i) => {
                const backoffDelay = Math.min(delay * Math.pow(2, i), maxDelay);
                return timer(backoffDelay);
              })
            );
          })
        );
      })
    );
}

And here's how I use it:

httpClient.post(...)
  .pipe(repeatWithBackoff(1000, 60000))
  .subscribe((x) => console.log('Result', x));

Apparently my code, kinda works but the retries are not working properly, I would expect that nothing happens, when the request fails (e.g. no console.log()) but I can see the observable printed to the console. Also, if someone has a clever idea to simplify the function, I will appreciate it :)


Solution

  • Modern RxJS makes this task simple by using the built-in retry and repeat operators which accept quite powerful configs.

    import { pipe, retry, repeat, timer } from 'rxjs'
    
    function pollWithBackoff<T> (delay: number, maxDelay: number): MonoTypeOperatorFunction<T> {
      return pipe(
        retry({
          delay: (_error, i) => {const backoffDelay = Math.min(delay * Math.pow(2, i-1), maxDelay);
          return timer(backoffDelay);}
        }),
        repeat({
          delay
        }),
      )
    }
    

    See it in action on stackblitz.

    Best way to create complex operators is using the pipe function. That enables you to compose RxJS operators the same way you would use them in the .pipe method. No need to work with the source observable in which you would need to take care of all errors and completions and send them to the output observable.

    First, use retry operator that accepts a config object, in which you can provide the custom timing function. It is important that you start with the retry operator, because we want the backoff to restart each time the request errors. Note: I don't know why, but it appears that the error index in i starts with 1.

    Now you have the logic "take this request and retry with exponential backoff". After that, add repeat operator which again accepts a config object with a delay. That makes sure that the previous logic repeats. Because we already took care of the retries, using the repeat operator stays simple and repeats only once a successful request is made.

    See retry documentation, repeat documentation for more info on the config objects.