Search code examples
apache-kafkaspring-kafkaspring-cloud-streamreactorspring-cloud-stream-binder-kafka

How autoCommit works and how many message will be polled once in spring-cloud-stream reactive kafka?


I check the document, it only says how auto commit works with poll() here, and how to configure poll count, here.

So how things works when I use Flux?

Below are my consumer code.

    @Bean
    fun consumerInboundMsg(handler: QueueHandler): java.util.function.Function<Flux<MessageRequest>, Mono<Void>> {
        return Function { flux ->
            flux.asFlow().flatMapMerge {
                flow {
                    handler.handleInboundRequest(it)
                    emit(it)
                }
            }.asFlux().then()
        }
    }

Solution

  • To clarify, the auto commit mechanism is determined by the Apache Kafka's implementation. You're using Spring Cloud Stream reactive or Spring Cloud Stream non-reactive doesn't impact how the auto commit works. The offset will be committed in every poll and check the time elapsed is greater than auto.commit.interval.ms.

    In case the commit interval is 5 seconds and poll is happening in 7 seconds, the commit will happen after 7 seconds only.

    To check how often your consumer commits the offsets, enabling the trace log: logging.level.org.apache.kafka: trace