Search code examples
javaresttemplatespring-webfluxproject-reactorspring-websocket

How to have multiple subscribers to Flux that run on different execution contexts / threads


I'm developing a Spring Boot WebFlux application for IoT real time data visualization.

I have a Flux which simulates data coming from a device, and I want that upon websocket connection established every single event:

  • must be sent over the websocket for real time visualization (using reactive WebSocketHandler)
  • must be checked against a given condition so to send a notification via HTTP REST call (RestTemplate)

From my logs it seems the two subscribers (the websocket handler and the notifier) gets two different flows with completely different values (below the logs).

I also tried a variant chaining the share method after the map in the MySource class, in this case it looks like though I have just one Flux, there is only ONE thread so everything is blocking (I can see the REST call blocks the sending over the websocket).

What happens here? How can I make the two subscribers to be run in different execution contexts (different threads), so completely independent from each other?

Below relevant code snippets and logs.

Thank you all in advance!

UPDATE: for the sake of clarity I have to specify that MyEvents have RANDOM-generated values, so I solved one issue thanks to @NikolaB's answer by using the ConnectableFlux / share which guarantees to have the same Flux, but I still want to have separate execution contexts for the two subscribers.

public class MyWebSocketHandler implements WebSocketHandler {

   @Autowired
   public MySource mySource;

   @Autowired
   public Notifier notifier;

   public Mono<Void> handle(WebSocketSession webSocketSession) {
            Flux<MyEvent> events = mySource.events();
            events.subscribe(event -> notifier.sendNotification(event));
            return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
   }

   private String toJson(MyEvent event) {
       log.info("websocket toJson " + event.getValue());
       ...
   }
}
public class MySource {
   public Flux<MyEvent> events() {
      return Flux.interval(...).map(i -> new MyEvent(*Random Generate Value*);
   }
}
public class Notifier {

   public void sendNotification (MyEvent event) {
      log.info("notifier sendNotification " + event.getValue());
      if (condition met)
         restTemplate.exchange(...)
   }
}
2019-11-19 11:58:55.375 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.09
2019-11-19 11:58:55.375 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.86
2019-11-19 11:58:57.366 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.24
2019-11-19 11:58:57.374 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.11
2019-11-19 11:58:59.365 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.61
2019-11-19 11:58:59.374 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.03
2019-11-19 11:59:01.365 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.88
2019-11-19 11:59:01.375 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.29
2019-11-19 11:59:03.364 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.37

Solution

  • There are couple of issues here, first RestTemplate is synchronous/blocking HTTP client so you should use WebClient which is reactive, also to create ConnectableFlux (Flux which can have multiple subscribers) you need to share it before map operator and create new Flux-es which are created from connected one.

    Example:

    Flux<MyEvent> connectedFlux = mySource.events().share();
    Flux.from(connectedFlux).subscribe(event -> notifier.sendNotification(event));
    return webSocketSession.send(Flux.from(connectedFlux).map(this::toJson).map(webSocketSession::textMessage));
    

    Also sendNotification method should return Mono<Void> since reactive methods should always return Mono or Fluxtypes.

    To initiate independent executions you could Zip those two Monos.

    Edit

    First of all as mentioned above use WebClient for outgoing HTTP calls which is reactive HTTP client and rework the Notifier class:

    public class Notifier {
    
       public Mono<Void> sendNotification (MyEvent event) {
          log.info("notifier sendNotification " + event.getValue());
          return Mono.just(event)
                     .filter(e -> /* your condition */)
                     .flatMap(e -> WebClient.builder().baseUrl("XXX")...)
                     .then();
       }
    
    }
    

    Now take a look if execution context is different.