Search code examples
javarx-javarx-java2reactivexrx-java3

io.reactivex.exceptions.UndeliverableException The exception could not be delivered to the consumer because it has already canceled/disposed


Getting an UndeliverableException while using completable

public Completable createBucketWithStorageClassAndLocation() {
        return Completable.complete()
                .doFinally(() -> {
            Bucket bucket =
                    storage.create(
                            BucketInfo.newBuilder(googleUploadObjectConfiguration.bucketName())
                                    .setStorageClass(storageClass)
                                    .setLocation(googleUploadObjectConfiguration.locationName())
                                    .build());       
        }).doOnError(error -> LOG.error(error.getMessage()));
    }

The exception is thrown from the Google storage which is correct, But trying to handle on doOnError method

Caused by: com.google.cloud.storage.StorageException: You already own this bucket. Please select another name.

RXJava exception

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.google.cloud.storage.StorageException: You already own this bucket. Please select another name.
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
    at io.reactivex.internal.operators.completable.CompletableDoFinally$DoFinallyObserver.runFinally(CompletableDoFinally.java:99)
    at io.reactivex.internal.operators.completable.CompletableDoFinally$DoFinallyObserver.onComplete(CompletableDoFinally.java:79)
    at io.micronaut.reactive.rxjava2.RxInstrumentedCompletableObserver.onComplete(RxInstrumentedCompletableObserver.java:64)
    at io.reactivex.internal.disposables.EmptyDisposable.complete(EmptyDisposable.java:68)
    at io.reactivex.internal.operators.completable.CompletableEmpty.subscribeActual(CompletableEmpty.java:27)
    at io.reactivex.Completable.subscribe(Completable.java:2309)
    at io.micronaut.reactive.rxjava2.RxInstrumentedCompletable.subscribeActual(RxInstrumentedCompletable.java:51)
    at io.reactivex.Completable.subscribe(Completable.java:2309)
    at io.reactivex.internal.operators.completable.CompletableDoFinally.subscribeActual(CompletableDoFinally.java:43)
    at io.reactivex.Completable.subscribe(Completable.java:2309)
    at io.micronaut.reactive.rxjava2.RxInstrumentedCompletable.subscribeActual(RxInstrumentedCompletable.java:51)
    at io.reactivex.Completable.subscribe(Completable.java:2309)
    at io.reactivex.internal.operators.completable.CompletablePeek.subscribeActual(CompletablePeek.java:51)
    at io.reactivex.Completable.subscribe(Completable.java:2309)
    at io.micronaut.reactive.rxjava2.RxInstrumentedCompletable.subscribeActual(RxInstrumentedCompletable.java:51)
    at io.reactivex.Completable.subscribe(Completable.java:2309)
    at io.reactivex.Completable.subscribe(Completable.java:2410)
    at fete.bird.StartUp.onApplicationEvent(StartUp.java:24)
    at fete.bird.StartUp.onApplicationEvent(StartUp.java:12)
    at io.micronaut.context.DefaultBeanContext.notifyEventListeners(DefaultBeanContext.java:1323)
    at io.micronaut.context.DefaultBeanContext.publishEvent(DefaultBeanContext.java:1308)
    at io.micronaut.http.server.netty.NettyHttpServer.fireStartupEvents(NettyHttpServer.java:507)
    at io.micronaut.http.server.netty.NettyHttpServer.start(NettyHttpServer.java:350)
    at io.micronaut.http.server.netty.NettyHttpServer.start(NettyHttpServer.java:113)
    at io.micronaut.runtime.Micronaut.lambda$start$2(Micronaut.java:77)
    at java.base/java.util.Optional.ifPresent(Optional.java:176)
    at io.micronaut.runtime.Micronaut.start(Micronaut.java:75)
    at io.micronaut.runtime.Micronaut.run(Micronaut.java:311)
    at io.micronaut.runtime.Micronaut.run(Micronaut.java:297)
    at fete.bird.FeteBirdServiceApplication.main(FeteBirdServiceApplication.java:16)

From the rxjava documentation https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling I need to handle the error in the application.

I need to write the below code,

/ If Java 8 lambdas are supported
RxJavaPlugins.setErrorHandler(e -> { });

My question is where should I write this code. I have a Micronaut application using java or this is the only way to handle the exception.


Solution

  • Use Completable.fromAction and perhaps try-catch the exception instead of that doFinally contraption:

    Completable.fromAction(() -> {
        try {
            Bucket bucket = storage.create(
                BucketInfo.newBuilder(googleUploadObjectConfiguration.bucketName())
                          .setStorageClass(storageClass)
                          .setLocation(googleUploadObjectConfiguration.locationName())
                          .build()); 
        } catch (Throwable error) {
            LOG.error(error.getMessage());
        }
    })