I have the following RSocket server
@Log4j2
public class NativeRsocketServerFnF {
public static void main(String[] args) {
RSocketFactory.receive()
.frameDecoder(ZERO_COPY)
.errorConsumer(log::error)
.acceptor((setup, clientHandleRsocket) -> {
return Mono.just(
new AbstractRSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
CharSequence message = payload.data().readCharSequence(payload.data().readableBytes(), forName("UTF-8"));
payload.release();
log.info("> from client: {}", message);
return Mono.empty();
}
}
);
})
.transport(TcpServerTransport.create(8000))
.start()
.block()
.onClose()
.block();
}
}
and the following RSocket client
@Log4j2
public class NativeRsocketClientFnF {
public static void main(String[] args) {
RSocketFactory.connect()
.frameDecoder(ZERO_COPY)
.errorConsumer(log::error)
.transport(TcpClientTransport.create(8000))
.start()
.flatMap(rSocket -> rSocket.fireAndForget(DefaultPayload.create("ping")))
.block();
}
}
As you can see I am trying send "ping" as payload data from client to the server
When I start the server & start the client for 1st time, I see > from client: ping
If I restart the client again, I dont see any messages on the server. The break point is not even hit on the server
My understanding is Fire and Forget simply sends the data & does not bother to wait & see if the server is successful in processing the data, but in my case, the server itself is not receiving the data on subsequent runs of client (as good as new clients)
Is there something I am missing?
I am using version 1.0.0-RC5
of rsocket-core
& rsocket-transport-netty
OS: Ubuntu 16.04
Your understanding of Fire and Forget pattern is correct.
The problem is io.rsocket.RSocketRequester#fireAndForget
completes successfully before data was sent to the network. And when you block it and exit your client application, the connection closes.
See io.rsocket.RSocketRequester#handleFireAndForget
return emptyUnicastMono()
.doOnSubscribe(
new OnceConsumer<Subscription>() {
@Override
public void acceptOnce(@Nonnull Subscription subscription) {
ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encode(
allocator,
streamId,
false,
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
payload.sliceData().retain());
payload.release();
sendProcessor.onNext(requestFrame);
}
});
sendProcessor
is intermediate flux processor. It is used by actual connection and sending request frame to the network may be delayed.
You could add .then(Mono.delay(Duration.ofMillis(100)))
before your .block()
calling on the client side for test purpose. I think most real applications won't exit immediately after calling fireAndForget.