Search code examples
spring-kafka

Spring Kafka. Non-blocking retries - topics are created even when autoCreateTopics property is false


I am trying to implement non-blocking retries for my project using spring-kafka 2.8.0 with non-blocking retry feature.

I have done the following. autoCreateTopics property is false!

    @KafkaListener(
            id = "customListenerId",
            idIsGroup = false,
            topics = "main-topic-1, main-topic-2")
    @RetryableTopic(
            autoCreateTopics = "false",
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
            backoff = @Backoff(delay = 10000),
            attempts = "6",
            kafkaTemplate = "kafkaTemplate",
            include = {
                    AerospikeException.Timeout.class,
                    QueryTimeoutException.class,
                    HystrixRuntimeException.class})
    public void consumePerTopic(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                @Payload byte[] payload,
                                Acknowledgment acknowledgment) {
        processor.process(objectMapper.readValue(payload, CreateRequest.class));
        acknowledgment.acknowledge();
    }

I got the following integration test for it which uses kafka as embedded container. Test simulates db connection issues - code under test must throw AerospikeException.Timeout which is declared as retryable in consumer.

@Test
    void shouldNotCreateEntityForTopicsWhenDbTimedOutAndProduceRetryMessages() {
        performWithDbLatency(Duration.ofMillis(1000), () -> {
            var createEntityIdempotencyKeys = entityKafkaSteps.sendCreateRequestToAllTopicsWithIdempotency(
                    getCreateEntityRequest("entity-owner-a"));

            retryEntityKafkaSteps.consumeRetryCreateRequestsByIdempotencyKeys(createEntityIdempotencyKeys);
        });
    }

My configuration for embedded kafka container looks like

embedded:
  kafka:
    topicsToCreate: main-topic-1, main-topic-2

I understand that the default behaviour of kafka is to allow auto topics creation. This test is passing successfully, but as I understand autoCreateTopics property should block retry and dlt topics creation even if it is allowed in kafka broker -> and test must fail because there is no retry and dlt topics in broker.

Please clarify is it an expected behaviour or not, or whether I am missing something? The same behaviour is present if to declare retry functionality via RetryTopicConfiguration bean using doNotAutoCreateRetryTopics() method.

Thanks a lot.

Edited : My main concern is:

  1. I forbid spring-kafka to create retry and dlt topics for my consumer for main-topic-1 and main-topic-2 using autoCreateTopics = "false".
  2. I create ONLY main-topic-1 and main-topic-2 in my embedded kafka container using embedded.kafka.topicsToCreate property. (Kafka broker default property is allow.auto.create.topics = true)
  3. On runtime I can see that main-topic-1-retry, main-topic-1-dlt, main-topic-2-retry and main-topic-2-dlt are still created.

I am using playtika test containers library.


Solution

  • No autoCreateTopics = "false" simply means that the framework will not create the topics; it has no bearing on whether the broker is configured to create them.

    @EmbeddedKafka(topics = ...) will always create the topics.

    To disable topic creation in the embedded kafka broker, use

    @EmbeddedKafka(brokerProperties = "auto.create.topics.enable:false")

    and don't specify any topics there.

    Or you can set the consumer property allow.auto.create.topics to false; it overrides the broker property (if it is true).

    https://kafka.apache.org/documentation/#consumerconfigs_allow.auto.create.topics