Search code examples
javaspringspring-cloudspring-cloud-streamgoogle-cloud-pubsub

Don't mark message as acknowledged in reactive Function with Spring Cloud Stream


I am using spring-cloud-stream in version Horsham SR1 with Java 13. I am using Google Pub/Sub as underlying messaging system.

I have a reactive Function that looks like this:

@Bean
public Function<Flux<Message>, Mono<Void>> messageConsumer() {
    return messageFlux ->
            messageFlux
                    .flatMap(message -> {
                        // do something
                        return something;
                    })
                    .doOnError(throwable -> log.error("could not process  message", throwable))
                    .then();
}

How can I get Spring to not acknowledge an erroneous message? Is it sufficient to throw an exception inside the flatMap method?


Solution

  • You have to understand that there are pros and cons with each approach and with reactive we have no view into the stream. It is completely under your control. In fact one of the main differences is that the above function is only invoked once, where if it was imperative function it would be invoked on each message.

    Basically with reactive user effectively declares the unit of operation as the entire stream (whatever that may mean in he context of your application). With imperative the unit of operation is a single Message, hence we can do things like per-message acks, nacks etc.