I'm using Spring Kafka in a Spring Boot application. I'm attempting to use a Kafka ConsumerInterceptor
to intercept when offsets are committed.
This seems to work producers transactions are not enabled but transactions are turned on Interceptor::onCommit
is no longer called.
The following minimal example everything works as expected:
@SpringBootApplication
@EnableKafka
class Application {
@KafkaListener(topics = ["test"])
fun onMessage(message: String) {
log.warn("onMessage: $message")
}
Interceptor:
class Interceptor : ConsumerInterceptor<String, String> {
override fun onCommit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>) {
log.warn("onCommit: $offsets")
}
override fun onConsume(records: ConsumerRecords<String, String>): ConsumerRecords<String, String> {
log.warn("onConsume: $records")
return records
}
}
Application config:
spring:
kafka:
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
properties:
"interceptor.classes": com.example.Interceptor
group-id: test-group
listener:
ack-mode: record
Inside a test using @EmbeddedKafka
:
@Test
fun sendMessage() {
kafkaTemplate.send("test", "id", "sent message").get() // block so we don't end before the consumer gets the message
}
This outputs what I would expect:
onConsume: org.apache.kafka.clients.consumer.ConsumerRecords@6a646f3c
onMessage: sent message
onCommit: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
However, when I enabled transactions by providing a transaction-id-prefix
the Interceptor
's onCommit
is no longer called.
My updated config only adds:
spring:
kafka:
producer:
transaction-id-prefix: tx-id-
And the test is updated to wrap send
in a transaction:
@Test
fun sendMessage() {
kafkaTemplate.executeInTransaction {
kafkaTemplate.send("test", "a", "sent message").get()
}
}
With this change my log output is now only
onConsume: org.apache.kafka.clients.consumer.ConsumerRecords@738b5968
onMessage: sent message
The Interceptor
's onConsume
method is called and the @KafkaListener
receives the message but onCommit
is never called.
Does anyone happen to know whats happening here? Are my expectations about what I should see here incorrect?
Offsets are not committed via the consumer when using transactions (exactly once semantics). Instead, the offset is committed via the producer.
KafkaProducer
...
/**
* Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* committed only if the transaction is committed successfully. The committed offset should
* be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
* <p>
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern. Thus, the specified
* {@code groupMetadata} should be extracted from the used {@link KafkaConsumer consumer} via
* {@link KafkaConsumer#groupMetadata()} to leverage consumer group metadata. This will provide
* stronger fencing than just supplying the {@code consumerGroupId} and passing in {@code new ConsumerGroupMetadata(consumerGroupId)},
* however note that the full set of consumer group metadata returned by {@link KafkaConsumer#groupMetadata()}
* requires the brokers to be on version 2.5 or newer to understand.
*
* <p>
* Note, that the consumer should have {@code enable.auto.commit=false} and should
* also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
* This method will raise {@link TimeoutException} if the producer cannot send offsets before expiration of {@code max.block.ms}.
* Additionally, it will raise {@link InterruptException} if interrupted.
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started.
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0) or
* the broker doesn't support latest version of transactional API with all consumer group metadata
* (i.e. if its version is lower than 2.5.0).
* @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
* format used for the offsets topic on the broker does not support transactions
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized, or the consumer group id is not authorized.
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried
* (e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a
* mis-configured consumer instance id within group metadata.
* @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
* to the partition leader. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
* @throws TimeoutException if the time taken for sending offsets has surpassed max.block.ms.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {