Search code examples
angularrxjsngrx

retryWhen only fires once, not multiple times


I'm using NGRX in an angular project to keep track of upload state for a bunch of files.

I want to limit the number of active uploads to 3. I have an effect that responds to an action whenever a new file is added to the queue. In the effect, I want to determine whether this file can start uploading.

Here's my effect:


canStartUpload$ = createEffect(() => 
    this.actions$.pipe(
        ofType(ImageActions.updateImageWithFullUpload),
        withLatestFrom(this._store.select(selectActiveUploadsTotal)), // gets active uploads from state
        mergeMap((values) => {
            return iif(
                () => values[1] < 3, // conditionally checks if the current upload 
                this.tis.getImageUploadS3Link(values[0].image._id)
                    .pipe(map(({url}) => ImageActions.uploadImage({image: values[0].image, url }))), // Starts upload
                throwError(new Error('Cannot upload at the moment'))// throws errors to be caught by 'retryWhen' method. 
            )

        }),
        retryWhen(errors => {
           return errors.pipe(
               tap(err => console.log(err)),// This only logs once, not multiple times
               tap(() => console.log("Retrying Upload")),
               delay(10000),
               take(10)
           )
        })

    )
)

The problem is, the source observable is not retried multiple times. It logs "Retrying Upload" once and then never again even though the conditional check still says it should retry again.

My guess is that when an error is thrown on the outer stream, then the stream dies, instead of being retried.

If that's the case, how can I create an inner stream that retries multiple times while also getting the most recent state from the state machine and pass that value to the iif function.

In an effort to explain exactly what I'm trying to accomplish, here is some pseudo code:

Start processing effect when action fires


Get active uploads from state machine

Check if condition is met( less than 3 active uploads)

If true --> start upload

If false --> retry in 10 seconds(starting with getting state from state machine)


Try this block maximum of 10 times.


Solution

  • One way to solve this would be to use the useEffectsErrorHandler config property:

    createEffect(
      () => /* ... */,
      { useEffectsErrorHandler: true }
    );
    

    By using the above option, the defaultEffectsErrorHandler function will make sure that if the effects stream does not catch an error, it will be resubscribed. By default, there is a maximum of 10 attempts:

    export function defaultEffectsErrorHandler<T extends Action>(
      observable$: Observable<T>,
      errorHandler: ErrorHandler,
      retryAttemptLeft: number = MAX_NUMBER_OF_RETRY_ATTEMPTS
    ): Observable<T> {
      return observable$.pipe(
        catchError((error) => {
          if (errorHandler) errorHandler.handleError(error);
          if (retryAttemptLeft <= 1) {
            return observable$; // last attempt
          }
          // Return observable that produces this particular effect
          return defaultEffectsErrorHandler(
            observable$,
            errorHandler,
            retryAttemptLeft - 1
          );
        })
      );
    }
    

    You can provide your own effects-error-handler function:

    {
      provide: EFFECTS_ERROR_HANDLER,
      useValue: yourCustomHandler,
    },
    

    Another way to solve this, which does not involve setting any other options, would be to adjust the effect a bit:

    canStartUpload$ = createEffect(() => 
      this.actions$.pipe(
      ofType(ImageActions.updateImageWithFullUpload),
      
      mergeMap(
        action => timer(0, 10_000).pipe(
          withLatestFrom(this._store.select(selectActiveUploadsTotal)), // gets active uploads from state,
          filter([, activeUploadsTotal] => activeUploadsTotal < 3),
          mapTo(action),
    
          // stop the timer when the condition is true
          // otherwise, let the timer be active and keep checking
          take(1),
        )
      ),
    
      mergeMap(
        action => this.tis.getImageUploadS3Link(values[0].image._id).pipe(
          map(({url}) => ImageActions.uploadImage({image: values[0].image, url }))
        )
      ),
    )