Search code examples
springapache-kafkaspring-kafkaconsumer

How work RetryTopic with other groupId and backOff properties


I'm using spring-kafka and @RetryTopic and I can't understand how this works

  1. for different groupId-s :

    For example, the topic TestTopic contains 5 messages and is listened to by 2 consumers - groupId_1 and groupId_2

    groupId_1 did not process message #2 and sent it to TestTopic_1

    groupId_2 did not process message #4 and sent it to TestTopic_1

    When groupId_1 and groupId_2 read messages from TestTopic_1, it turns out that groupId_1 subtracts both messages #2 and #4 (is it obtained again?) And groupId_1 subtracts message #2 (is it repeated?) and #4 How it works? How to avoid re-processing?

  2. And a question about setting up work @RetryTopic about backoff, for example I have a setting

    @RetryableTopic(
             attempts = "3",
             topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
             backoff = @Backoff(delay = 1000, maxDelay = 5_000, random = true),
             dltTopicSuffix = "dead-two"
    )
    

    Do I understand correctly that the thread reading from the partition is blocked for a delay time, after which it sends a message to the topic for re-reading or is a separate thread created for this waiting and sending?

  3. And during this delay the application crashes, then the message will not be sent to the topic TestTopic_N?

  4. And why is maxDelay needed if delay and attempts are specified??


Solution

    1. It is not designed to handle that situation; each consumer group is independent; you would work around it by using custom retry topic naming so that the retry topics include the group id; that is not the case by default. https://docs.spring.io/spring-kafka/docs/current/reference/html/#topic-naming

    Also see Also see this section in the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#multi-retry You can simply set different retryTopicSuffix and dltTopicSuffix on each listener.

    1. The thread is not blocked; the record is immediately written to the next retry topic; the consumer on that topic (when it receives the record) looks at the timestamp and, if it is too early, rejects the message, the partition is paused on that consumer until the time expires, and then it is resumed and the record is redelivered.

    2. See #2.

    3. maxDelay is only used when using an exponential back off (with a multiplier). e.g. delay = 2, multiplier = 2.0, maxDelay = 10 would result in 2, 4, 8, 10, 10... up to max attempts.

    EDIT

    Further to comments below; here is an example of using existing topics for the retry topics:

    @SpringBootApplication
    @EnableScheduling
    public class So77216081Application extends RetryTopicConfigurationSupport {
    
        public static void main(String[] args) {
            SpringApplication.run(So77216081Application.class, args);
        }
    
        @Override
        protected RetryTopicComponentFactory createComponentFactory() {
            return new RetryTopicComponentFactory() {
    
                @Override
                public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
                    return new MyNamesProvider();
                }
    
            };
        }
    
        @Bean
        ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("so77216081", "foo");
            };
        }
    
    }
    
    @Component
    class Listener {
    
        @KafkaListener(id = "so77216081", topics = "so77216081")
        @RetryableTopic(attempts = "3", autoCreateTopics = "false")
        void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC)
        String topic) {
            System.out.println(in + " from " + topic);
            throw new RuntimeException("test");
        }
    
        @DltHandler
        void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC)
        String topic) {
            System.out.println(in + " from " + topic);
        }
    
    }
    
    class MyNamesProvider implements RetryTopicNamesProviderFactory {
    
        @Override
        public RetryTopicNamesProvider createRetryTopicNamesProvider(Properties properties) {
            return new SuffixingRetryTopicNamesProvider(properties) {
    
                @Override
                public String getTopicName(String topic) {
                    if (properties.isMainEndpoint() || properties.isDltTopic()) {
                        return super.getTopicName(topic);
                    }
                    else if (super.getTopicName(topic).endsWith("-0")) {
                        return "TestFirstrandomtopic";
                    }
                    else if (super.getTopicName(topic).endsWith("-1")) {
                        return "TestSecondrandomtopic";
                    }
                    else {
                        throw new IllegalStateException("Shouldn't get here - attempts is only 3");
                    }
                }
    
            };
        }
    
    }
    
    foo from so77216081
    foo from TestFirstrandomtopic
    foo from TestSecondrandomtopic
    foo from so77216081-dlt
    

    Note that since autoCreateTopics is false, all 4 topics must have previously been added to the brokers.