Search code examples
spring-bootkotlinapache-kafkaspring-kafka

Kafka ConsumerInterceptor onCommit not being called when using transactions


I'm using Spring Kafka in a Spring Boot application. I'm attempting to use a Kafka ConsumerInterceptor to intercept when offsets are committed.

This seems to work producers transactions are not enabled but transactions are turned on Interceptor::onCommit is no longer called.

The following minimal example everything works as expected:

@SpringBootApplication
@EnableKafka
class Application {
    @KafkaListener(topics = ["test"])
    fun onMessage(message: String) {
        log.warn("onMessage: $message")
    }

Interceptor:

class Interceptor : ConsumerInterceptor<String, String> {
    override fun onCommit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>) {
        log.warn("onCommit: $offsets")
    }

    override fun onConsume(records: ConsumerRecords<String, String>): ConsumerRecords<String, String> {
        log.warn("onConsume: $records")
        return records
    }
}

Application config:

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      properties:
        "interceptor.classes": com.example.Interceptor
      group-id: test-group
    listener:
      ack-mode: record

Inside a test using @EmbeddedKafka:

    @Test
    fun sendMessage() {
        kafkaTemplate.send("test", "id", "sent message").get() // block so we don't end before the consumer gets the message
    }

This outputs what I would expect:

onConsume: org.apache.kafka.clients.consumer.ConsumerRecords@6a646f3c
onMessage: sent message
onCommit: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}

However, when I enabled transactions by providing a transaction-id-prefix the Interceptor's onCommit is no longer called.

My updated config only adds:

spring:
  kafka:
    producer:
      transaction-id-prefix: tx-id-

And the test is updated to wrap send in a transaction:

    @Test
    fun sendMessage() {
        kafkaTemplate.executeInTransaction {
            kafkaTemplate.send("test", "a", "sent message").get()
        }
    }

With this change my log output is now only

onConsume: org.apache.kafka.clients.consumer.ConsumerRecords@738b5968
onMessage: sent message

The Interceptor's onConsume method is called and the @KafkaListener receives the message but onCommit is never called.

Does anyone happen to know whats happening here? Are my expectations about what I should see here incorrect?


Solution

  • Offsets are not committed via the consumer when using transactions (exactly once semantics). Instead, the offset is committed via the producer.

    KafkaProducer...

    /**
     * Sends a list of specified offsets to the consumer group coordinator, and also marks
     * those offsets as part of the current transaction. These offsets will be considered
     * committed only if the transaction is committed successfully. The committed offset should
     * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
     * <p>
     * This method should be used when you need to batch consumed and produced messages
     * together, typically in a consume-transform-produce pattern. Thus, the specified
     * {@code groupMetadata} should be extracted from the used {@link KafkaConsumer consumer} via
     * {@link KafkaConsumer#groupMetadata()} to leverage consumer group metadata. This will provide
     * stronger fencing than just supplying the {@code consumerGroupId} and passing in {@code new ConsumerGroupMetadata(consumerGroupId)},
     * however note that the full set of consumer group metadata returned by {@link KafkaConsumer#groupMetadata()}
     * requires the brokers to be on version 2.5 or newer to understand.
     *
     * <p>
     * Note, that the consumer should have {@code enable.auto.commit=false} and should
     * also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
     * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
     * This method will raise {@link TimeoutException} if the producer cannot send offsets before expiration of {@code max.block.ms}.
     * Additionally, it will raise {@link InterruptException} if interrupted.
     *
     * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started.
     * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0) or
     *         the broker doesn't support latest version of transactional API with all consumer group metadata
     *         (i.e. if its version is lower than 2.5.0).
     * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
     *         format used for the offsets topic on the broker does not support transactions
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized, or the consumer group id is not authorized.
     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried
     *         (e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction.
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a
     *                                                                  mis-configured consumer instance id within group metadata.
     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
     *         to the partition leader. See the exception for more details
     * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
     *         other unexpected error
     * @throws TimeoutException if the time taken for sending offsets has surpassed max.block.ms.
     * @throws InterruptException if the thread is interrupted while blocked
     */
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                         ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {