Search code examples
javaspringwebsocketspring-integrationspring-websocket

Spring Integration and Reactive WebSockets


Spring integration offers non-reactive inbound/outbound WebSocket adapters which, simply put, associate session with id via internal container, you do some processing on message, and on outbound, it checks message headers for session id, and sends it via that session.

Now, with Spring offering reactive WebSocket support via org.springframework.web.reactive.socket.WebSocketSession and other classes there, I'm wondering of there is similar support in terms of channel adapters for reactive WebSocket stack.

If not, are there any common patterns/practices, how would one integrate reactive WS with spring-integration message flows?


Solution

  • This feature hasn't been called yet, so we haven't thought on the matter.

    Please, take a look into my SandBox. This is the best I can suggest with the current state of situation.

    We just follow the standard Spring WebFlux suggestions to implement a WebSockets solution. So, we have there a WebSocketHandler implementation with an appropriate URL mapping. The implementation just forward a Flux from the session.receive() into the IntegrationFlow registered dynamically. The flow then converted to the Reactive Publisher which is used for the session.send().

    I believe there can be used many other approaches, e.g. using a FluxMessageChannel bean and its subscribeTo() from this handle(WebSocketSession) impl to bridge the Flux into predefined Integration flow. Or simple @MessagingGateway call from the doOnNext().

    Not sure, though, if session.send() can be used downstream independently (need to play), but you can see in the sample how I propagate a WebSocketSession into the MessageHeaders to give it access in the Integration flow.