I am using @RetryableTopic to implement retry logic in kafka consumer. I gave config as below:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
However, instead of retrying for 4 times, it retries infinitely, and that too in no delay time. Can someone please help me with the code? I want the message to be retried 4 times with first time delay- after 5 mins, then 2nd delay after 10 mins, third delay after 20 mins...
Code is below:
int i = 1;
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
String prachi = null;
System.out.println("current time: " + new Date());
System.out.println("retry method invoked -> " + i++ + " times from topic: " + topic);
System.out.println("current time: " + new Date());
prachi.equals("abc");
}
@DltHandler
public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("current time dlt: " + new Date());
System.out.println("DLT Received: " + in + " from " + topic + " offset " + offset + " -> " + i++ + " times");
System.out.println("current time dlt: " + new Date());
//dump event to dlt queue
}
Kafka config:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "grp_STRING");
return new DefaultKafkaConsumerFactory<>(config);
//inject consumer factory to kafka listener consumer factory
}
@Bean(name = "default")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Logs when running app: this is not the complete logs:
32:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.186 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-1-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [3-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-0-0]
2022-02-23 13:58:40.187 INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-1 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [4-retry-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-1-0]
2022-02-23 13:58:40.187 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-2-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-0 to the committed offset FetchPosition{offset=24, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [5-retry-2-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-2-0]
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#2-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-1, topic_string_data-0]
2022-02-23 13:58:40.189 INFO 96675 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string-1, topic_string-0]
2022-02-23 13:58:40.188 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.190 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.191 INFO 96675 --- [ner#6-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-dlt-0, topic_string_data-dlt-1, topic_string_data-dlt-2]
2022-02-23 13:58:40.196 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-1-0
2022-02-23 13:58:40.196 WARN 96675 --- [4-retry-1-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-1 is not ready for consumption, backing off for approx. 26346 millis.
current time: Wed Feb 23 13:58:40 IST 2022
retry method invoked -> 3 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:40 IST 2022
2022-02-23 13:58:40.713 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:40 IST 2022
retry method invoked -> 4 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:40 IST 2022
2022-02-23 13:58:41.228 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:41 IST 2022
retry method invoked -> 5 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:41 IST 2022
2022-02-23 13:58:41.740 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:41 IST 2022
retry method invoked -> 6 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:41 IST 2022
2022-02-23 13:58:42.254 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:42 IST 2022
retry method invoked -> 7 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:42 IST 2022
2022-02-23 13:58:42.777 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:42 IST 2022
retry method invoked -> 8 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:42 IST 2022
2022-02-23 13:58:43.298 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:43 IST 2022
retry method invoked -> 9 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:43 IST 2022
2022-02-23 13:58:43.809 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:43 IST 2022
retry method invoked -> 10 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:43 IST 2022
current time: Wed Feb 23 13:59:10 IST 2022
retry method invoked -> 11 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:10 IST 2022
2022-02-23 13:59:10.733 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-1-0
2022-02-23 13:59:10.736 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-2-0
2022-02-23 13:59:10.737 WARN 96675 --- [5-retry-2-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-2 is not ready for consumption, backing off for approx. 29483 millis.
current time: Wed Feb 23 13:59:10 IST 2022
retry method invoked -> 12 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:10 IST 2022
2022-02-23 13:59:11.249 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:11 IST 2022
retry method invoked -> 13 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:11 IST 2022
2022-02-23 13:59:11.769 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:11 IST 2022
retry method invoked -> 14 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:11 IST 2022
2022-02-23 13:59:12.286 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:12 IST 2022
retry method invoked -> 15 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:12 IST 2022
2022-02-23 13:59:12.805 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:12 IST 2022
retry method invoked -> 16 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:12 IST 2022
2022-02-23 13:59:13.339 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:13 IST 2022
retry method invoked -> 17 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:13 IST 2022
2022-02-23 13:59:13.856 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:13 IST 2022
retry method invoked -> 18 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:13 IST 2022
2022-02-23 13:59:14.372 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:14 IST 2022
retry method invoked -> 19 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:14 IST 2022
current time: Wed Feb 23 13:59:40 IST 2022
retry method invoked -> 20 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:40 IST 2022
2022-02-23 13:59:40.846 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:40 IST 2022
DLT Received: prachi from topic_string_data-dlt offset 14 -> 21 times
current time dlt: Wed Feb 23 13:59:46 IST 2022
current time: Wed Feb 23 13:59:46 IST 2022
retry method invoked -> 22 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:46 IST 2022
2022-02-23 13:59:47.466 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:47 IST 2022
DLT Received: prachi from topic_string_data-dlt offset 15 -> 23 times
current time dlt: Wed Feb 23 13:59:47 IST 2022
current time: Wed Feb 23 13:59:47 IST 2022
retry method invoked -> 24 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:47 IST 2022
2022-02-23 13:59:47.981 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:47 IST 2022
DLT Received: prachisharma from topic_string_data-dlt offset 16 -> 25 times
current time dlt: Wed Feb 23 13:59:47 IST 2022
current time: Wed Feb 23 13:59:47 IST 2022
retry method invoked -> 26 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:47 IST 2022
2022-02-23 13:59:48.493 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:48 IST 2022
DLT Received: hie from topic_string_data-dlt offset 17 -> 27 times
current time dlt: Wed Feb 23 13:59:48 IST 2022
current time: Wed Feb 23 13:59:48 IST 2022
retry method invoked -> 28 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:48 IST 2022
2022-02-23 13:59:49.011 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:49 IST 2022
DLT Received: hieeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee from topic_string_data-dlt offset 18 -> 29 times
current time dlt: Wed Feb 23 13:59:49 IST 2022
current time: Wed Feb 23 13:59:49 IST 2022
retry method invoked -> 30 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:49 IST 2022
2022-02-23 13:59:49.527 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:49 IST 2022
DLT Received: hie from topic_string_data-dlt offset 19 -> 31 times
current time dlt: Wed Feb 23 13:59:49 IST 2022
current time: Wed Feb 23 13:59:49 IST 2022
retry method invoked -> 32 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:49 IST 2022
current time dlt: Wed Feb 23 13:59:50 IST 2022
DLT Received: hi from topic_string_data-dlt offset 20 -> 33 times
current time dlt: Wed Feb 23 13:59:50 IST 2022
2022-02-23 13:59:50.039 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-2-0
current time: Wed Feb 23 13:59:50 IST 2022
retry method invoked -> 34 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:50 IST 2022
2022-02-23 13:59:50.545 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:50 IST 2022
DLT Received: hi from topic_string_data-dlt offset 21 -> 35 times
current time dlt: Wed Feb 23 13:59:50 IST 2022
current time: Wed Feb 23 13:59:50 IST 2022
retry method invoked -> 36 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:50 IST 2022
current time dlt: Wed Feb 23 13:59:51 IST 2022
DLT Received: hi from topic_string_data-dlt offset 22 -> 37 times
current time dlt: Wed Feb 23 13:59:51 IST 2022
It seems there are two separate problems.
One is that you seem to already have records in the topics, and if you have it configured to earliest
the app will read all those records when it starts up. You can either set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
to latest
, or, if you're running locally on docker, you can stop the Kafka container and prune the volumes with something like docker system prune --volumes
(note that this will erase data from all your stopped containers - use wisely).
Can you try one of these and test again?
The other problem is that the framework is wrongly setting the default maxDelay of 30s even though the annotation states the default is to ignore. I'll open an issue for that and add the link here.
For now You can set a maxDelay such as @Backoff(delay = 600000, multiplier = 3.0, maxDelay = 5400000)
, then the application should have the correct delays for 10, 30 and 90 minutes as you wanted.
Let me know if that works out for you, or if you have any other problems related to this issue.
EDIT: Issue opened, you can follow the development there https://github.com/spring-projects/spring-kafka/issues/2137
It should be fixed in the next release.
EDIT 2: Actually the phrasing in the @BackOff
annotation is rather ambiguous, but seems like the behavior is correct and you should explicitly set a larger maxDelay.
The documentation should clarify this behavior in the next release.
EDIT 3: To answer your question in the comments, the way retryable topics work is the partition is paused for the duration of the delay, but the consumer keeps polling the broker, so longer delays don't trigger rebalancing.
From your logs the rebalancing is from the main topic's partitions, so it's unlikely it has anything to do with this feature.
EDIT 4: The Retryable Topics feature was released in Spring for Apache Kafka 2.7.0, which uses kafka-clients 2.7.0. However, there have been several improvements to the feature, so I recommend using the latest Spring Kafka version (currently 2.8.3) if possible to benefit from those.