Search code examples
apache-kafkaspring-kafkaproject-reactorreactor-kafka

Commit offset in Reactor Kafka


I have a reactor Kafka project that consumes messages from Kafka topic, transforms the message and then writes to another topic.

public Flux<String> consume(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(s-> sendToKafka(s,destTopic))
                .map(ConsumerRecord::value)
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
    }

My understanding is the offset is committed only after all the sequence steps are completed successfully in reactor. Is that correct? I want to make sure the next record is not processed unless the current record is successfully sent to the destination Kafka Topic.


Solution

  • The implementation is like this:

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
        return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
            .receive()
            .filter(it -> !it.isEmpty())
            .publishOn(scheduler, preparePublishOnQueueSize(prefetch))
            .map(consumerRecords -> Flux.fromIterable(consumerRecords)
                .doAfterTerminate(() -> {
                    for (ConsumerRecord<K, V> r : consumerRecords) {
                        handler.acknowledge(r);
                    }
                })));
    }
    

    So, every ConsumerRecords is ack'ed only when its Flux is fully processed: successfully or with an error. Therefore it is not a commit-per-record. And technically it must not be per record anyway, since we need a commit only our consumer app fails and we need to continue from the offset we have left before. The currently active KafkaConsumer keeps a cursor in-memory and doesn't care if you commit or not.

    If you really want "per record" see ReactiveKafkaConsumerTemplate.receive() and its KafkaReceiver.receive() delegate:

    /**
     * Starts a Kafka consumer that consumes records from the subscriptions or partition
     * assignments configured for this receiver. Records are consumed from Kafka and delivered
     * on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
     * when the returned Flux terminates.
     * <p>
     * Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
     * to commit the offset corresponding to the record. Acknowledged records are committed
     * based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
     * Records may also be committed manually using {@link ReceiverOffset#commit()}.
     *
     * @return Flux of inbound receiver records that are committed only after acknowledgement
     */
    default Flux<ReceiverRecord<K, V>> receive() {