Search code examples
springreactorrsocket

RSocket channel error : "reactor.core.publisher.Operators.error - Operator called default onErrorDropped" with merged flux


I want to create a rsocket channel where the data sent from the server can be either a reaction to a client request or a push. I use a flux merge for that.

It's referential data : the refresh can be asked by the client and the server can also push updates.

So I have this on the server side :

    @MessageMapping("update-stream")
    Flux<DomainObject> addUpdatesListener(Flux<RefreshRequest> requests) {
        Flux<DomainObject> pushFlux = Flux.from(this.flux)
            .doOnError((e) -> log.error("Error on push flux : {}", e, e));
        return requests
                .map(this::getUpdates)
                .flatMap(Flux::fromIterable)
                .doOnError((e) -> log.error("Error on channel flux : {}", e, e))
                .mergeWith(pushFlux)
                .doOnError((e) -> log.error("Error on merged flux : {}", e, e));
    }

It works excepts that when I stop the client I have the following error :

06-07-2020 15:58:53.168 [reactor-http-nio-3] ERROR reactor.core.publisher.Operators.error - Operator called default onErrorDropped
java.util.concurrent.CancellationException: Disposed
    at reactor.core.publisher.FluxProcessor.dispose(FluxProcessor.java:80)
    at io.rsocket.core.RSocketResponder$3.hookOnCancel(RSocketResponder.java:513)
    at reactor.core.publisher.BaseSubscriber.cancel(BaseSubscriber.java:230)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at io.rsocket.core.RSocketResponder.cleanUpSendingSubscriptions(RSocketResponder.java:275)
    at io.rsocket.core.RSocketResponder.cleanup(RSocketResponder.java:265)
    at io.rsocket.core.RSocketResponder.tryTerminate(RSocketResponder.java:167)
    at io.rsocket.core.RSocketResponder.tryTerminateOnConnectionClose(RSocketResponder.java:160)
    at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:132)
    at reactor.core.publisher.MonoProcessor$NextInner.onComplete(MonoProcessor.java:518)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:308)
    at reactor.core.publisher.MonoProcessor.onComplete(MonoProcessor.java:265)
    at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:23)
    at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:60)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
    at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1158)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:760)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)  

If I don't do the merge, I have no error.

I tried many different versions but I cant find a way to have both the push an no error logged on client quit.

What am I missing ?

Thank a lot.


Solution

  • The problem disapears when upgrading from spring-boot 2.3.0.RELEASE to 2.3.1.RELEASE.