TLDR: What is required to configure a Spring Boot application that exposes an RSocket interface that supports the WebSocket transport?
I'm learning about RSocket and Spring Boot at the same time, so please bear with me.
In my struggles, I have been able to build a very simple and contrived implementation of a Spring Boot application that consumes an API provided/exposed by a second Spring Boot application using RSocket as the protocol, however, I am only able to achieve this when using the TcpClientTransport
.
From my perspective, the WebsocketTransport
is much more likely to be used and more useful for client->server architectures, however, I haven't found any working examples or documentation on how to properly configure a Spring Boot application that accepts RSocket messages using WebSocket as the transport.
The odd part is that in my tests it appears that my consumer (client) does establish a WebSocket connection to the server/producer, however, the 'handshake' appears to hang and the connection is never fully established. I've tested with both the JavaScript libraries (rsocket-websocket-client, rsocket-rpc-core, etc), and the Java libraries (io.rsocket.transport.netty.client.WebsocketClientTransport) and the server appears to exhibit the same behavior regardless.
To reiterate, using the TCPTransport I am able to connect to the server and invoke requests just fine, however when using the WebsocketTransport
the connection is never established.
What is required of a Spring Boot application that aims to support RSocket via the WebsocketClientTransport
, past consuming spring-boot-starter-rsocket
as a dependency?.
...
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.M5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
...
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
...
spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootRSocketServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRSocketServerApplication.class, args);
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
@Slf4j
@Controller
public class UserRSocketController {
@Autowired
private UserRepository userRepository;
@MessageMapping("usersList")
public Mono<List<User>> usersList() {
log.info("Handling usersList request.");
return Mono.just(this.userRepository.getUsers());
}
@MessageMapping("usersStream")
Flux<User> usersStream(UserStreamRequest request) {
log.info("Handling request for usersStream.");
List<User> users = userRepository.getUsers();
Stream<User> userStream = Stream.generate(() -> {
Random rand = new Random();
return users.get(rand.nextInt(users.size()));
});
return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
}
@MessageMapping("userById")
public Mono<User> userById(GetUserByIdRequest request) {
log.info("Handling request for userById id: {}.", request.getId());
return Mono.just(this.userRepository.getUserById(request.getId()));
};
}
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:02,986 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
// ClientTransport transport = TcpClientTransport.create(8081);
// ^--- TCPTransport works fine
ClientTransport transport = WebsocketClientTransport.create(8081);
// ^--- Connection hangs and application startup stalls
return RSocketFactory
.connect()
.mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(transport)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:52,331 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
You only need two things to have an RSocket application exposing endpoints using the websocket transport:
First, you need both webflux and rsocket dependencies as you'll probably need to serve web pages and static resources as well:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
Then you need to configure the RSocket server accordingly in your application.properties
file:
#server.port=8080 this is already the default
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket
You'll find more about that in the Spring Boot reference documentation about RSocket.
The websocket client can now connect to ws://localhost:8080/rsocket
.
Note that as of the current 2.2.0 SNAPSHOTs, the RSocket protocol has evolved and the rsocket-js library is currently catching up, especially in the metadata support. You'll find a working sample here as well.
On the Java client side of things, Spring Boot provides you with a RSocketRequester.Builder
that's already configured and customized to your needs with codecs and interceptors:
@Component
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder builder) {
this.rsocketRequester = builder
.connectWebSocket(URI.create("ws://localhost:8080/rsocket"))
.block();
}
}