I have a reactor Kafka project that consumes messages from Kafka topic, transforms the message and then writes to another topic.
public Flux<String> consume(String destTopic) {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> sendToKafka(s,destTopic))
.map(ConsumerRecord::value)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
My understanding is the offset is committed only after all the sequence steps are completed successfully in reactor. Is that correct? I want to make sure the next record is not processed unless the current record is successfully sent to the destination Kafka Topic.
The implementation is like this:
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
.receive()
.filter(it -> !it.isEmpty())
.publishOn(scheduler, preparePublishOnQueueSize(prefetch))
.map(consumerRecords -> Flux.fromIterable(consumerRecords)
.doAfterTerminate(() -> {
for (ConsumerRecord<K, V> r : consumerRecords) {
handler.acknowledge(r);
}
})));
}
So, every ConsumerRecords
is ack'ed only when its Flux
is fully processed: successfully or with an error. Therefore it is not a commit-per-record. And technically it must not be per record anyway, since we need a commit only our consumer app fails and we need to continue from the offset we have left before. The currently active KafkaConsumer
keeps a cursor in-memory and doesn't care if you commit or not.
If you really want "per record" see ReactiveKafkaConsumerTemplate.receive()
and its KafkaReceiver.receive()
delegate:
/**
* Starts a Kafka consumer that consumes records from the subscriptions or partition
* assignments configured for this receiver. Records are consumed from Kafka and delivered
* on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
* when the returned Flux terminates.
* <p>
* Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
* to commit the offset corresponding to the record. Acknowledged records are committed
* based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
* Records may also be committed manually using {@link ReceiverOffset#commit()}.
*
* @return Flux of inbound receiver records that are committed only after acknowledgement
*/
default Flux<ReceiverRecord<K, V>> receive() {