I am trying to get messages from kafka and send it to RSocket using Spring. Posting Server side on Spring Java and client side with React
@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {
@Bean
public Function<Integer, Mono<Integer>> rsocketConsumer(RSocketRequester.Builder builder,
RsocketConsumerProperties rsocketConsumerProperties) {
RSocketRequester rSocketRequester = builder.websocket(URI.create("ws://localhost:7000/"));
return input -> rSocketRequester.route(rsocketConsumerProperties.getRoute()).data(input).retrieveMono(Integer.class);
}
}
@EnableBinding(Sink.class)
public class Listener {
@Autowired
private Function<Integer, Mono<Integer>> rsocketConsumer;
@StreamListener(Sink.INPUT)
public void fireAndForget(Integer val) {
System.out.println(val);
rsocketConsumer.apply(val).subscribe();
}
}
@Controller
public class ServerController {
@MessageMapping("data")
public Mono<Integer> hello(Integer integer) {
return Mono.just(integer);
}
}
What do i do wrong in server side because my client is connected but not able to get new messages
client.connect().subscribe({
onComplete: socket => {
socket.fireAndForget({
data: { message: "hello from javascript!" },
metadata: null
});
},
onError: error => {
console.log("got error");
console.error(error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
console.log("subscribe!");
console.log(cancel);
// cancel.cancel();
}
});
You do this requester.route("input").data("Welcome to Rsocket").send();
where we have this:
/**
* Perform a {@link RSocket#fireAndForget fireAndForget} sending the
* provided data and metadata.
* @return a completion that indicates if the payload was sent
* successfully or not. Note, however that is a one-way send and there
* is no indication of whether or how the event was handled on the
* remote end.
*/
Mono<Void> send();
You see - Mono
? That means that it has to be subscribed to initiate a reactive stream processing. See project Reactor for more info: https://projectreactor.io/
On the other hand it is not clear what is server and what is client in your case... you do this:
/**
* Build an {@link RSocketRequester} with an
* {@link io.rsocket.core.RSocketClient} that connects over WebSocket to
* the given URL. The requester can be used to make requests
* concurrently. Requests are made over a shared connection that is also
* re-established as needed when further requests are made.
* @param uri the URL to connect to
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester websocket(URI uri);
And I would say it means client in the code you show. The server is on the other side where that 7000
port is opened for ws://
protocol. So, be sure that you understand and configure all the parts properly. For example I don't see why do you need a @RestController
in your Listener
class...