I'm developing a Spring Boot WebFlux application for IoT real time data visualization.
I have a Flux
which simulates data coming from a device, and I want that upon websocket connection established every single event:
WebSocketHandler
)RestTemplate
)From my logs it seems the two subscribers (the websocket handler and the notifier) gets two different flows with completely different values (below the logs).
I also tried a variant chaining the share
method after the map
in the MySource
class, in this case it looks like though I have just one Flux, there is only ONE thread so everything is blocking (I can see the REST call blocks the sending over the websocket).
What happens here? How can I make the two subscribers to be run in different execution contexts (different threads), so completely independent from each other?
Below relevant code snippets and logs.
Thank you all in advance!
UPDATE: for the sake of clarity I have to specify that MyEvent
s have RANDOM-generated values, so I solved one issue thanks to @NikolaB's answer by using the ConnectableFlux
/ share
which guarantees to have the same Flux
, but I still want to have separate execution contexts for the two subscribers.
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
public MySource mySource;
@Autowired
public Notifier notifier;
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<MyEvent> events = mySource.events();
events.subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
}
private String toJson(MyEvent event) {
log.info("websocket toJson " + event.getValue());
...
}
}
public class MySource {
public Flux<MyEvent> events() {
return Flux.interval(...).map(i -> new MyEvent(*Random Generate Value*);
}
}
public class Notifier {
public void sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
if (condition met)
restTemplate.exchange(...)
}
}
2019-11-19 11:58:55.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.09
2019-11-19 11:58:55.375 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.86
2019-11-19 11:58:57.366 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.24
2019-11-19 11:58:57.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.11
2019-11-19 11:58:59.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.61
2019-11-19 11:58:59.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.03
2019-11-19 11:59:01.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.88
2019-11-19 11:59:01.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.29
2019-11-19 11:59:03.364 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.37
There are couple of issues here, first RestTemplate
is synchronous/blocking HTTP client so you should use WebClient
which is reactive, also to create ConnectableFlux
(Flux
which can have multiple subscribers) you need to share it before map
operator and create new Flux
-es which are created from connected one.
Example:
Flux<MyEvent> connectedFlux = mySource.events().share();
Flux.from(connectedFlux).subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(Flux.from(connectedFlux).map(this::toJson).map(webSocketSession::textMessage));
Also sendNotification
method should return Mono<Void>
since reactive methods should always return Mono
or Flux
types.
To initiate independent executions you could Zip
those two Mono
s.
Edit
First of all as mentioned above use WebClient
for outgoing HTTP calls which is reactive HTTP client and rework the Notifier
class:
public class Notifier {
public Mono<Void> sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
return Mono.just(event)
.filter(e -> /* your condition */)
.flatMap(e -> WebClient.builder().baseUrl("XXX")...)
.then();
}
}
Now take a look if execution context is different.