Spring integration offers non-reactive inbound/outbound WebSocket adapters which, simply put, associate session with id via internal container, you do some processing on message, and on outbound, it checks message headers for session id, and sends it via that session.
Now, with Spring offering reactive WebSocket support via org.springframework.web.reactive.socket.WebSocketSession and other classes there, I'm wondering of there is similar support in terms of channel adapters for reactive WebSocket stack.
If not, are there any common patterns/practices, how would one integrate reactive WS with spring-integration message flows?
This feature hasn't been called yet, so we haven't thought on the matter.
Please, take a look into my SandBox. This is the best I can suggest with the current state of situation.
We just follow the standard Spring WebFlux suggestions to implement a WebSockets solution. So, we have there a WebSocketHandler
implementation with an appropriate URL mapping. The implementation just forward a Flux
from the session.receive()
into the IntegrationFlow
registered dynamically. The flow then converted to the Reactive Publisher
which is used for the session.send()
.
I believe there can be used many other approaches, e.g. using a FluxMessageChannel
bean and its subscribeTo()
from this handle(WebSocketSession)
impl to bridge the Flux
into predefined Integration flow. Or simple @MessagingGateway
call from the doOnNext()
.
Not sure, though, if session.send()
can be used downstream independently (need to play), but you can see in the sample how I propagate a WebSocketSession
into the MessageHeaders
to give it access in the Integration flow.