Search code examples
javaspring-bootrsocket

Spring RSocketRequester issue when created with "wrap" function


It works fine when I'm creating RSocketRequester like that:

@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
    return RSocketRequester.builder()
            .rsocketStrategies(rSocketStrategies)
            .connectTcp("localhost", 7000)
            .block();
}

But it throws an exception when I try to send message creating if like that:

@Bean
RSocket rSocket() {
    return RSocketFactory
            .connect()
            .transport(TcpClientTransport.create(7000))
            .start()
            .block();
}

@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
    return RSocketRequester.wrap(rSocket(), MediaType.APPLICATION_CBOR, MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()), rSocketStrategies);
}

io.rsocket.exceptions.ApplicationErrorException: No handler for destination '' at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45) ~[rsocket-core-1.0.0-RC5.jar:na] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ Handler com.issoft.rnd.ms2.controller.TestController#test4() [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/test4" [ExceptionHandlingWebHandler] Stack trace: at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45) ~[rsocket-core-1.0.0-RC5.jar:na] at io.rsocket.RSocketRequester.handleFrame(RSocketRequester.java:556) ~[rsocket-core-1.0.0-RC5.jar:na] at io.rsocket.RSocketRequester.handleIncomingFrames(RSocketRequester.java:516) ~[rsocket-core-1.0.0-RC5.jar:na] at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:8134) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na] at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:213) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE] at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:346) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE] at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE] at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-0.9.1.RELEASE.jar:0.9.1.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) ~[netty-codec-4.1.43.Final.jar:4.1.43.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) ~[netty-codec-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) ~[netty-common-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.43.Final.jar:4.1.43.Final] at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Sending function:

public Mono<String> request(String route, String data, Class<String> clazz) {
    return rSocketRequester.route(route).data(data).retrieveMono(clazz);
}

Server:

@Controller
@AllArgsConstructor
public class RTestController {

    private EchoService echoService;

    @MessageMapping("test")
    Mono<String> test(String value) {
        return echoService.echo(value);
    }
}

Spring Boot Version: 2.2.1.RELEASE

Note: Mime types are taken from first working version.

What's wrong with wrap function?


Solution

  • TL;DR: it's not a problem with Spring, but with the way you've created the client RSocket connection.

    The RSocketRequester.wrap method is typically used on the server side, once the client has initiated the connection and the server wants to send requests to the other party. The Javadoc says that you can use it on the client side, and it's a perfectly valid use case.

    Now the RSocketRequester.wrap accepts, in its arguments, data about the expected data mime type and metadata mime type. This is typically negotiated between the two parties while setting up the connection, with RSocket SETUP/METADATA_PUSH frames. Once the connection is established, it's done. The wrap method needs those arguments because we can't extract that information easily from an existing RSocket.

    In your case, the connection has been created with the RSocketFactory without providing that information (the ClientRSocketFactory class provides many methods for that). So I think the metadata can't be properly encoded in the request and the server can't read the routing information as a result.

    Unless you've got very specific requirements, I'd suggest to use the RSocketRequester.Builder API to create a client RSocket as it's easier to get things right.