Search code examples
spring-webfluxreactor-netty

How do you to detect a drop in connection from the client with Spring Webflux and Reactor Netty?


I'm dealing with an issue where I'm not able to surface any kind of error exception my server is disconnected from my client. I want my code to be able to detect that the client no longer has a connection with the server but nothing I'm trying is working. doOnError, doOnCancel, doFinally all are not able to surface anything.

    @Override
    public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessException 

        return client.post()
                .uri("/query-stream")
                .contentType(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromValue("{\"sql\": \"" + sqlQuery + "\"}"))
                .retrieve()
                .bodyToFlux(String.class)
                .skip(1)
                .<T>handle((jsonString, sink) -> handleJsonResponse(jsonString,entityClass, sink))
                .doOnError(this::handleError)
                .doOnCancel(() -> System.out.println("Cancel connection"))
                .doFinally(signalType -> {
                    if (signalType == SignalType.CANCEL) {
                        System.out.println("Cancel Signal");
                    }
                });

    }
        ksqlDbEntityTemplate.select(null, String.class)
                .subscribe(System.out::println,
                error -> {
                    // Handle errors
                    error.printStackTrace();
                    latch.countDown(); // Release the latch in case of an error.
                }, latch::countDown
        );

Solution

  • Going with the TCP Keep alive mechanism seems to be the only options here which works for us. After the kernel's designated TCP Keep Alive time is exceeded, and exception is thrown and we can act off of that exception.