I have the challenge that an existing web service returns for one call 2 messages but with different structures / types.
How do I handle this with WebFlux ReactorNettyWebSocketClient in Spring? i.e. how can I attach 2 different mapper or handle it though multiple subscribers?
Assumption is here that the order of these 2 response messages are always the same, so that the first one will be parsed by the "info" mapper and the 2nd by the "auth" mapper.
The 2nd message is the information for a subscriber. The server will then continue to send multiple messages on a channel "0". How can I subscribe on these with my client when I receive the 2nd message?
Client:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session -> session.send(Mono.just(session.textMessage(authCommand)))
.thenMany(session.receive().take(2).doOnNext(message -> {
System.out.println(message.getPayloadAsText());
})).then()).block();
First return from Server:
{"event":"info","version":2,"serverId":"20df78274661","platform":{"status":1}}
{"event":"auth","status":"OK","chanId":0,"userId":2256812}
Channel Messages which will be sent after the two messages from the Server:
[0,"wu",["funding","USD",11041.78713953,0,8.090876180000123,null,null]]
[0,"wu",["funding","BTC",0.25495514,0,4.000000003445692e-8,null,null]]
[0,"wu",["funding","EUR",2139.03965717,0,0.00965717000008226,null,null]]
Thank you for any suggestions and ideas in advance.
After studying some material about WebFlux and Reactor I came to the following approach. I am using a Processor where you can attach multiple subscribers. This Processor is then attached in the WebSocketClient.
A good summary about different processors can be found here
How to use Processor in Java's Reactor
With this approach I would be able to add dynamically a subscriber. So when I detect the channel message I can add a new subscriber to my processor for that channel number by using a filter. The dummy code below illustrates the behaviour.
ReplayProcessor<String> output = ReplayProcessor.create();
output.filter(msg -> msg.startsWith("{")).subscribe(msg -> {
System.out.println(msg);
JSONObject json = new JSONObject(msg);
if (json.has("event") && json.get("event").equals("auth")) {
if (json.has("chanId")) {
int chanId = json.getInt("chanId");
// add new subscriber for that channel number, receives only messages from that channel
output.filter(t -> t.startsWith("[" + chanId)).subscribe( t -> System.err.println(t));
}
}
});
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(authCommand)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribeWith(output))
.then()).block();