I have a Flux which emits an unbound number of values on an interval. I need a Websockets endpoint, which emits the values of the Flux when a client connects.
Currently I realized it as follows:
@ServerWebSocket("/updates")
public class UpdateController {
private Flux<Update> updates;
// ... left out for brevity
@OnOpen
public Flux<Update> onOpen(WebSocketSession session) {
return updates.flatMap(session::send);
}
@OnMessage
public void onMessage(String content) {
// do nothing
}
@OnClose
public void onClose(WebSocketSession session) {
// do nothing
}
}
This works, but as soon as the client closes the connection, an exception is thrown.
Makes sense to me, since the updates
Flux still emits values and session::send
will get called.
But how could I structure my code in a way, that this exception is not thrown? I have the feeling that I am missing something.
You can prevent emitting value to the closed session by adding a filter that checks if the session is open before the flatMap call:
@OnOpen
public Flux<String> onOpen(WebSocketSession session) {
return updates
.filter(it -> session.isOpen())
.flatMap(session::send);
}