Ok I am new with RSocket. I am trying to create a simple RSocket client and simple RSocket server. From the research that I have done it says that RSocket supports resumption:
It is particularly useful as, when sending a RESUME frame containing information about the last received frame, the client is able to resume the connection and only request the data it hasn’t already received, avoiding unnecessary load on the server and wasting time trying to retrieve data that was already retrieved.
It also says that the client is the one responsible for enabling resumption. My question is how to enable this resumption and how to send that RESUME frame. I have functional client and server but if I turn off the server and start it again nothing is happening and later when the client is trying again to communicate with the server it throws: java.nio.channels.ClosedChannelException.
This is my client configuration:
@Configuration
public class ClientConfiguration {
/**
* Defining the RSocket client to use tcp transport on port 7000
*/
@Bean
public RSocket rSocket() {
return RSocketFactory
.connect()
.resumeSessionDuration(Duration.ofDays(10))
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
/**
* RSocketRequester bean which is a wrapper around RSocket
* and it is used to communicate with the RSocket server
*/
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
And this is a RestController from which I am starting the communication with the rsocket server:
@RestController
public class UserDataRestController {
private final RSocketRequester rSocketRequester;
public UserDataRestController(RSocketRequester.Builder rSocketRequester) {
this.rSocketRequester = rSocketRequester.connectTcp("localhost", 7000).block();
}
@GetMapping(value = "/feed/{firstName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Person> feed(@PathVariable("firstName") String firstName) {
return rSocketRequester
.route("feedPersonData")
.data(new PersonDataRequest(firstName))
.retrieveFlux(Person.class);
}
}
Due to the fact that sessions are stored in memory you couldn't resume after server restarts. See io.rsocket.resume.SessionManager#sessions
.
But you can still protect yourself from network issues if you reconnect to the same server. And you don't have to send RESUME frame, client does it for you.
You should configure server:
@Bean
ServerRSocketFactoryProcessor serverRSocketFactoryProcessor() {
return RSocketFactory.ServerRSocketFactory::resume;
}
And client io.rsocket.RSocketFactory.ClientRSocketFactory#resume
.
You could find almost full example here