Search code examples
redisspring-webfluxspring-websocket

Webflux, with Websocket how to prevent subscribing twice of reactive redis messaging operation


I have a websocket implementation using redis messaging operation on webflux. And what it does is it listens to topic and returns the values via websocket endpoint.

The problem I have is each time a user sends a message via websocket to the endpoint it seems a brand new redis subscription is made, resulting in the accumulation of subscribers on the redis message topic and the websocket responses are increased with the number of redis topic message subscribtions as well (example user sends 3 messages, redis topic subscriptions are increased to three, websocket connection responses three times).

Would like to know if there is a way to reuse the same subscription to the messaging topic so it would prevent multiple redis topic subscriptions.

The code I use is as follows:

  • Websocket Handler

    public class SendingMessageHandler implements WebSocketHandler {
      private final Gson gson = new Gson();
    
      private final MessagingService messagingService;
    
      public SendingMessageHandler(MessagingService messagingService) {
          this.messagingService = messagingService;
      }
    
      @Override
      public Mono<Void> handle(WebSocketSession session) {
          Flux<WebSocketMessage> stringFlux = session.receive()
                  .map(WebSocketMessage::getPayloadAsText)
                  .flatMap(inputData ->
                          messagingService.playGame(inputData)
                                  .map(data ->
                                          session.textMessage(gson.toJson(data))
                                  )
                  );
    
          return session.send(stringFlux);
      }
     }
    
  • Message Handling service

    public class MessagingService{ private final ReactiveRedisOperations<String, GamePubSub> reactiveRedisOperations;

      public MessagingService(ReactiveRedisOperations<String, GamePubSub> reactiveRedisOperations) {
          this.reactiveRedisOperations = reactiveRedisOperations;
      }
    
    
      public Flux<Object> playGame(UserInput userInput){
          return reactiveRedisOperations.listenTo("TOPIC_NAME");
      }
    

}

Thank you in advance.


Solution

  • Instead of using ReactiveRedisOperations, MessageListener is the way to go here. You can register a listener once, and use the following as the listener.

    data -> session.textMessage(gson.toJson(data))
    

    The registration should happen only once at the beginning of the connection. You can override void afterConnectionEstablished(WebSocketSession session) of SendingMessageHandler to accomplish this. That way a new subscription created per every new Websocket connection, per every message.

    Also, don't forget to override afterConnectionClosed, and unsubscribe from the redis topic, and clean up the listener within it.

    Instructions on how to use MessageListener.