I check the document, it only says how auto commit works with poll()
here, and how to configure poll count, here.
So how things works when I use Flux
?
Below are my consumer code.
@Bean
fun consumerInboundMsg(handler: QueueHandler): java.util.function.Function<Flux<MessageRequest>, Mono<Void>> {
return Function { flux ->
flux.asFlow().flatMapMerge {
flow {
handler.handleInboundRequest(it)
emit(it)
}
}.asFlux().then()
}
}
To clarify, the auto commit mechanism is determined by the Apache Kafka's implementation. You're using Spring Cloud Stream reactive or Spring Cloud Stream non-reactive doesn't impact how the auto commit works. The offset will be committed in every poll and check the time elapsed is greater than auto.commit.interval.ms
.
In case the commit interval is 5 seconds and poll is happening in 7 seconds, the commit will happen after 7 seconds only.
To check how often your consumer commits the offsets, enabling the trace log: logging.level.org.apache.kafka: trace