Search code examples
springspring-bootspring-integrationrsocket

Spring Integraton RSocket and Spring RSocket interaction issues


I created a new sample and slipted the codes into client and server side.

The complete codes can be found here.

There are 3 version of server side.

  • server None Spring Boot app, using Spring Integration RSocket InboundGateway.
  • server-boot Reuse Spring RSocket autconfiguration, and created ServerRSocketConnecter through ServerRSocketMessageHanlder.
  • server-boot-messsagemapping Not use Spring Integration, just use Spring Boot RSocket autconfiguration, and @Controller and @MessageMapping.

There are 2 versions of client.

  • client, Sending messages using Spring Integration Rocket OutboundGateway.
  • client-requester Send messages using RSocketRequester, not use Spring Integration at all.

The client and server interaction mode is REQUEST_CHANNEL, and connect server via TCP/localhost:7000.

server

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

The application class:

@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
            System.out.println("Press any key to exit.");
            System.in.read();
        } finally {
            System.out.println("Exited.");
        }

    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector() {
        return new ServerRSocketConnector("localhost", 7000);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

server-boot

Dependencies in pom.xml.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

application.properties

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

Application class.

@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(DemoApplication.class, args);
    }

    // see PR: https://github.com/spring-projects/spring-boot/pull/18834
    @Bean
    ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
        var handler = new ServerRSocketMessageHandler(true);
        handler.setRSocketStrategies(rSocketStrategies);
        return handler;
    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
        return new ServerRSocketConnector(serverRSocketMessageHandler);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

server-boot-messagemapping

Dependencies in pom.xml.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

The application.properties.

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

The applcition class.

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@Controller
class UpperCaseHandler {

    @MessageMapping("/uppercase")
    public Flux<String> uppercase(Flux<String> input) {
        return input.map(String::toUpperCase);
    }
}

client

In the client, the dependencies in the pom.xml is like.


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

The application class:


@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ClientRSocketConnector clientRSocketConnector() {
        ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
        clientRSocketConnector.setAutoStartup(false);
        return clientRSocketConnector;
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
        return IntegrationFlows
                .from(Function.class)
                .handle(RSockets.outboundGateway("/uppercase")
                        .interactionModel((message) -> RSocketInteractionModel.requestChannel)
                        .expectedResponseType("T(java.lang.String)")
                        .clientRSocketConnector(clientRSocketConnector))
                .get();
    }
}

@RestController
class HelloController {

    @Autowired()
    @Lazy
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
    }
}

When running the client and server application, and try to access the http://localhost:8080/hello by curl.

When using server and server-boot which uses InboundGateway to handle messages, the output looks like this.

curl http://localhost:8080/hello

data:ABCD

When using server-boot-messagemapping, the output is woking as I expected:

data:A
data:B
data:C
data:D

client-requester

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

The application class:

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@RestController
class HelloController {
    Mono<RSocketRequester> requesterMono;

    public HelloController(RSocketRequester.Builder builder) {
        this.requesterMono = builder.connectTcp("localhost", 7000);
    }

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return requesterMono.flatMapMany(
                rSocketRequester -> rSocketRequester.route("/uppercase")
                        .data(Flux.just("a", "b", "c", "d"))
                        .retrieveFlux(String.class)
        );
    }
}

When running this client and the 3 servers, and try to access the http://localhost:8080/hello by curl.

When using server and server-boot which uses InboundGateway to handle messages, it throws a class cast exception.

When using server-boot-messagemapping, the output is woking as I expected:

data:A
data:B
data:C
data:D

I do not know where is the problem of the configuration of InboundGateway and OutboundGateway?


Solution

  • Thank you for such a detailed sample!

    So, what I see. Both clients (plain RSocketRequester and Spring Integration) work well with plain RSocket server.

    To make them working with Spring Integration server you have to do this changes:

    1. The server side:

    Add .requestElementType(ResolvableType.forClass(String.class)) into an RSockets.inboundGateway() definition, so it will know to what to convert an incoming payloads.

    1. The client side:

      .data(Flux.just("a\n", "b\n", "c\n", "d\n")).

    Currently the server side of Spring Integration doesn't treat an incoming Flux as a stream of independent payloads. So, we try to connect all of them into a single value. The new line delimiter is an indicator that we expect independent values. Spring Messaging on its side does exactly opposite: it checks for multi-value expected type and decode every element in the incoming Flux in its map() instead of an attempt for the whole Publisher decoding.

    It's going to be kinda breaking change, but possibly need to consider to fix RSocketInboundGateway logic to be consistent with regular @MessageMapping for RSocket support. Feel free to raise a GH issue!