Search code examples
rx-java2reactivex

rxjava complete after retryWhen on completeable


I'm using the retryWhen operator on a Completeable, is there a way to tell it to complete from the retry Flowable? something like this -

PublishSubject<?> retrySubject = PublishSubject.create();
public void someFunction() {
    someCompletable.retryWhen(new Function<Flowable<Throwable>, Publisher<?>>() {
        @Override
        public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
            return throwableFlowable.flatMap(throwable -> retrySubject.toFlowable(BackpressureStrategy.MISSING));
        }
    }).subscribe();
}

public void ignoreError(){
    retrySubject.onComplete();
}

Solution

  • You can't stop a flatMap by giving it an empty source. Also every error will keep subscribing more and more observers to that subject that causes memory leak.

    Use takeUntil to stop a sequence via the help of another source:

    PublishProcessor<Throwable> stopProcessor = PublishProcessor.create();
    
    source.retryWhen(errors -> 
        errors.takeUntil(
            stopProcessor
        )
        .flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS))
    )
    
    stopProcessor.onComplete();
    

    Edit If you want to reuse the same subject, you can suppress items on the stop path:

    PublishProcessor<Integer> stopProcessor = PublishProcessor.create();
    
    source.retryWhen(errors -> 
        errors.takeUntil(
            stopProcessor.ignoreElements().toFlowable()
        )
        .flatMap(error -> stopProcessor)
    )
    
    // retry
    stopProcessor.onNext(1);
    
    // stop
    stopProcessor.onComplete();