Search code examples
websocketproject-reactormicronautreactor-netty

Send updates from unbound Flux with Micronaut Websocket endpoint


I have a Flux which emits an unbound number of values on an interval. I need a Websockets endpoint, which emits the values of the Flux when a client connects.

Currently I realized it as follows:

@ServerWebSocket("/updates")
public class UpdateController {
    private Flux<Update> updates;
    
    // ... left out for brevity

    @OnOpen
    public Flux<Update> onOpen(WebSocketSession session) {
        return updates.flatMap(session::send);
    }

    @OnMessage
    public void onMessage(String content) {
        // do nothing
    }

    @OnClose
    public void onClose(WebSocketSession session) {
        // do nothing
    }
}

This works, but as soon as the client closes the connection, an exception is thrown. Makes sense to me, since the updates Flux still emits values and session::send will get called.

But how could I structure my code in a way, that this exception is not thrown? I have the feeling that I am missing something.


Solution

  • You can prevent emitting value to the closed session by adding a filter that checks if the session is open before the flatMap call:

    @OnOpen
    public Flux<String> onOpen(WebSocketSession session) {
        return updates
                .filter(it -> session.isOpen())
                .flatMap(session::send);
    }